1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-09 10:10:55 +02:00

Compare commits

..

1 Commits

522 changed files with 17138 additions and 41026 deletions

View File

@@ -1,15 +0,0 @@
name: Full CI
on: [push]
jobs:
formatter:
uses: ./.github/workflows/formatter.yml
unit-test:
uses: ./.github/workflows/pytest.yml
unit-test-matrix:
uses: ./.github/workflows/pytest-matrix.yml
end2end-test:
uses: ./.github/workflows/end2end-conda.yml

View File

@@ -1,48 +0,0 @@
name: Run Pytest with Coverage
on: [workflow_call]
jobs:
pytest:
runs-on: ubuntu-latest
defaults:
run:
shell: bash -el {0}
env:
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- uses: actions/checkout@v4
- name: Set up Conda
uses: conda-incubator/setup-miniconda@v3
with:
auto-update-conda: true
auto-activate-base: true
python-version: '3.11'
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
- name: Conda install and run pytest
run: |
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
git clone --branch $BEC_CORE_BRANCH https://gitlab.psi.ch/bec/bec.git
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
git clone --branch $OPHYD_DEVICES_BRANCH https://gitlab.psi.ch/bec/ophyd_devices.git
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
cd ./bec
conda create -q -n test-environment python=3.11
source ./bin/install_bec_dev.sh -t
cd ../
pip install -e ./ophyd_devices
pip install -e .[dev,pyside6]
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end

View File

@@ -1,61 +0,0 @@
name: Formatter and Pylint jobs
on: [workflow_call]
jobs:
Formatter:
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.13'
- name: Run black and isort
run: |
pip install black isort
pip install -e .[dev]
black --check --diff --color .
isort --check --diff ./
Pylint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.13'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pylint pylint-exit anybadge
- name: Run Pylint
run: |
mkdir -p ./pylint
set +e
pylint ./${{ github.event.repository.name }} --output-format=text > ./pylint/pylint.log
pylint-exit $?
set -e
- name: Extract Pylint Score
id: score
run: |
SCORE=$(sed -n 's/^Your code has been rated at \([-0-9.]*\)\/.*/\1/p' ./pylint/pylint.log)
echo "score=$SCORE" >> $GITHUB_OUTPUT
- name: Create Badge
run: |
anybadge --label=Pylint --file=./pylint/pylint.svg --value="${{ steps.score.outputs.score }}" 2=red 4=orange 8=yellow 10=green
- name: Upload Artifacts
uses: actions/upload-artifact@v4
with:
name: pylint-artifacts
path: |
# ./pylint/pylint.log # not sure why this isn't working
./pylint/pylint.svg

View File

@@ -1,48 +0,0 @@
name: Run Pytest with different Python versions
on: [workflow_call]
jobs:
pytest-matrix:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
env:
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
- name: Clone and install dependencies
run: |
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
git clone --branch $BEC_CORE_BRANCH https://gitlab.psi.ch/bec/bec.git
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
git clone --branch $OPHYD_DEVICES_BRANCH https://gitlab.psi.ch/bec/ophyd_devices.git
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
pip install -e ./ophyd_devices
pip install -e ./bec/bec_lib[dev]
pip install -e ./bec/bec_ipython_client
pip install -e .[dev,pyside6]
- name: Run Pytest
run: |
pip install pytest pytest-random-order
pytest -v --maxfail=2 --junitxml=report.xml --random-order ./tests/unit_tests

View File

@@ -1,47 +0,0 @@
name: Run Pytest with Coverage
on: [workflow_call]
jobs:
pytest:
runs-on: ubuntu-latest
env:
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
- name: Clone and install dependencies
run: |
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
git clone --branch $BEC_CORE_BRANCH https://gitlab.psi.ch/bec/bec.git
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
git clone --branch $OPHYD_DEVICES_BRANCH https://gitlab.psi.ch/bec/ophyd_devices.git
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
pip install -e ./ophyd_devices
pip install -e ./bec/bec_lib[dev]
pip install -e ./bec/bec_ipython_client
pip install -e .[dev,pyside6]
- name: Run Pytest with Coverage
run: |
pip install coverage pytest pytest-random-order
coverage run --source=./bec_widgets -m pytest -v --junitxml=report.xml --maxfail=2 --random-order --full-trace ./tests/unit_tests
coverage report
coverage xml

View File

@@ -12,9 +12,6 @@ variables:
description: ophyd_devices branch
value: main
CHILD_PIPELINE_BRANCH: $CI_DEFAULT_BRANCH
CHECK_PKG_VERSIONS:
description: Whether to run additional tests against min/max/random selection of dependencies. Set to 1 for running.
value: 0
workflow:
rules:
@@ -34,8 +31,8 @@ include:
inputs:
stage: test
path: "."
pytest_args: "-v,--random-order,tests/unit_tests"
pip_args: ".[dev]"
pytest_args: "-v --random-order tests/"
exclude_packages: ""
# different stages in the pipeline
stages:
@@ -60,7 +57,6 @@ stages:
- pip install -e ./ophyd_devices
- pip install -e ./bec/bec_lib[dev]
- pip install -e ./bec/bec_ipython_client
- pip install -e ./bec/pytest_bec_e2e
.install-os-packages: &install-os-packages
- apt-get update
@@ -77,9 +73,9 @@ formatter:
stage: Formatter
needs: []
script:
- pip install -e ./[dev]
- isort --check --diff --line-length=100 --profile=black --multi-line=3 --trailing-comma ./
- black --check --diff --color --line-length=100 --skip-magic-trailing-comma ./
- pip install black isort
- isort --check --diff ./
- black --check --diff --color ./
rules:
- if: $CI_PROJECT_PATH == "bec/bec_widgets"
@@ -88,7 +84,7 @@ pylint:
needs: []
before_script:
- pip install pylint pylint-exit anybadge
- pip install -e .[dev]
- pip install -e .[dev,pyqt6]
script:
- mkdir ./pylint
- pylint ./bec_widgets --output-format=text --output=./pylint/pylint.log | tee ./pylint/pylint.log || pylint-exit $?
@@ -147,7 +143,7 @@ tests:
- *clone-repos
- *install-os-packages
- *install-repos
- pip install -e .[dev,pyside6]
- pip install -e .[dev,pyqt6]
- coverage run --source=./bec_widgets -m pytest -v --junitxml=report.xml --maxfail=2 --random-order --full-trace ./tests/unit_tests
- coverage report
- coverage xml
@@ -162,20 +158,6 @@ tests:
- tests/reference_failures/
when: always
generate-client-check:
stage: test
needs: []
variables:
QT_QPA_PLATFORM: "offscreen"
script:
- *clone-repos
- *install-os-packages
- *install-repos
- pip install -e .[dev,pyside6]
- bw-generate-cli --target bec_widgets
# if there are changes in the generated files, fail the job
- git diff --exit-code
test-matrix:
parallel:
matrix:
@@ -185,6 +167,7 @@ test-matrix:
- "3.12"
QT_PCKG:
- "pyside6"
- "pyqt6"
stage: AdditionalTests
needs: []
@@ -203,20 +186,14 @@ test-matrix:
end-2-end-conda:
stage: End2End
needs: []
image: continuumio/miniconda3:25.1.1-2
image: continuumio/miniconda3
allow_failure: false
variables:
QT_QPA_PLATFORM: "offscreen"
script:
- *clone-repos
- *install-os-packages
- conda config --show-sources
- conda config --add channels conda-forge
- conda config --system --remove channels https://repo.anaconda.com/pkgs/main
- conda config --system --remove channels https://repo.anaconda.com/pkgs/r
- conda config --remove channels https://repo.anaconda.com/pkgs/main
- conda config --remove channels https://repo.anaconda.com/pkgs/r
- conda config --show-sources
- conda config --prepend channels conda-forge
- conda config --set channel_priority strict
- conda config --set always_yes yes --set changeps1 no
- conda create -q -n test-environment python=3.11
@@ -229,8 +206,9 @@ end-2-end-conda:
- cd ../
- pip install -e ./ophyd_devices
- pip install -e .[dev,pyside6]
- pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end
- pip install -e .[dev,pyqt6]
- cd ./tests/end-2-end
- pytest -v --start-servers --flush-redis --random-order
artifacts:
when: on_failure
@@ -245,7 +223,6 @@ end-2-end-conda:
- if: '$CI_PIPELINE_SOURCE == "parent_pipeline"'
- if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "main"'
- if: '$CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "production"'
- if: "$CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /^pre_release.*$/"
semver:
stage: Deploy

File diff suppressed because it is too large Load Diff

View File

@@ -1,24 +1,12 @@
# BEC Widgets
[![CI](https://github.com/bec-project/bec_widgets/actions/workflows/ci.yml/badge.svg)](https://github.com/bec-project/bec_widgets/actions/workflows/ci.yml)
[![badge](https://img.shields.io/pypi/v/bec-widgets)](https://pypi.org/project/bec-widgets/)
[![License](https://img.shields.io/github/license/bec-project/bec_widgets)](./LICENSE)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
**⚠️ Important Notice:**
🚨 **PyQt6 is no longer supported** due to incompatibilities with Qt Designer. Please use **PySide6** instead. 🚨
BEC Widgets is a GUI framework designed for interaction with [BEC (Beamline Experiment Control)](https://gitlab.psi.ch/bec/bec).
## Installation
Use the package manager [pip](https://pip.pypa.io/en/stable/) to install BEC Widgets:
```bash
pip install bec_widgets[pyside6]
pip install bec_widgets PyQt6
```
For development purposes, you can clone the repository and install the package locally in editable mode:
@@ -26,12 +14,22 @@ For development purposes, you can clone the repository and install the package l
```bash
git clone https://gitlab.psi.ch/bec/bec-widgets
cd bec_widgets
pip install -e .[dev,pyside6]
pip install -e .[dev,pyqt6]
```
BEC Widgets now **only supports PySide6**. Users must manually install PySide6 as no default Qt distribution is
specified.
BEC Widgets currently supports both Pyside6 and PyQt6, however, no default distribution is specified. As a result, users must install one of the supported
Python Qt distributions manually.
To select a specific Python Qt distribution, install the package with an additional tag:
```bash
pip install bec_widgets[pyqt6]
```
or
```bash
pip install bec_widgets[pyside6]
```
## Documentation
Documentation of BEC Widgets can be found [here](https://bec-widgets.readthedocs.io/en/latest/). The documentation of the BEC can be found [here](https://bec.readthedocs.io/en/latest/).
@@ -41,7 +39,7 @@ Documentation of BEC Widgets can be found [here](https://bec-widgets.readthedocs
All commits should use the Angular commit scheme:
> #### <a name="commit-header"></a>Angular Commit Message Header
>
>
> ```
> <type>(<scope>): <short summary>
> │ │ │
@@ -55,13 +53,13 @@ All commits should use the Angular commit scheme:
>
> └─⫸ Commit Type: build|ci|docs|feat|fix|perf|refactor|test
> ```
>
>
> The `<type>` and `<summary>` fields are mandatory, the `(<scope>)` field is optional.
> ##### Type
>
>
> Must be one of the following:
>
>
> * **build**: Changes that affect the build system or external dependencies (example scopes: gulp, broccoli, npm)
> * **ci**: Changes to our CI configuration files and scripts (examples: CircleCi, SauceLabs)
> * **docs**: Documentation only changes
@@ -73,5 +71,4 @@ All commits should use the Angular commit scheme:
## License
[BSD-3-Clause](https://choosealicense.com/licenses/bsd-3-clause/)
[BSD-3-Clause](https://choosealicense.com/licenses/bsd-3-clause/)

View File

@@ -1,4 +0,0 @@
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
__all__ = ["BECWidget", "SafeSlot", "SafeProperty"]

View File

@@ -0,0 +1,209 @@
""" This module contains the GUI for the 1D alignment application.
It is a preliminary version of the GUI, which will be added to the main branch and steadily updated to be improved.
"""
import os
from typing import Optional
from bec_lib.device import Positioner as BECPositioner
from bec_lib.device import Signal as BECSignal
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from qtpy.QtCore import QSize, Signal
from qtpy.QtGui import QIcon
from qtpy.QtWidgets import (
QApplication,
QCheckBox,
QDoubleSpinBox,
QMainWindow,
QPushButton,
QSpinBox,
)
import bec_widgets
from bec_widgets.qt_utils.error_popups import SafeSlot as Slot
from bec_widgets.qt_utils.toolbar import WidgetAction
from bec_widgets.utils import UILoader
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.widgets.bec_progressbar.bec_progressbar import BECProgressBar
from bec_widgets.widgets.device_line_edit.device_line_edit import DeviceLineEdit
from bec_widgets.widgets.lmfit_dialog.lmfit_dialog import LMFitDialog
from bec_widgets.widgets.positioner_box.positioner_box import PositionerBox
from bec_widgets.widgets.positioner_group.positioner_group import PositionerGroup
from bec_widgets.widgets.stop_button.stop_button import StopButton
from bec_widgets.widgets.toggle.toggle import ToggleSwitch
from bec_widgets.widgets.waveform.waveform_widget import BECWaveformWidget
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
logger = bec_logger.logger
class Alignment1D:
"""Alignment GUI to perform 1D scans"""
def __init__(self, client=None, gui_id: Optional[str] = None) -> None:
"""Initialization
Args:
config: Configuration of the application.
client: BEC client object.
gui_id: GUI ID.
"""
self.bec_dispatcher = BECDispatcher(client=client)
self.client = self.bec_dispatcher.client if client is None else client
QApplication.instance().aboutToQuit.connect(self.close)
self.dev = self.client.device_manager.devices
self._accent_colors = get_accent_colors()
self.ui_file = "alignment_1d.ui"
self.ui = None
self.progress_bar = None
self.waveform = None
self.init_ui()
def init_ui(self):
"""Initialise the UI from QT Designer file"""
current_path = os.path.dirname(__file__)
self.ui = UILoader(None).loader(os.path.join(current_path, self.ui_file))
# Customize the plotting widget
self.waveform = self.ui.findChild(BECWaveformWidget, "bec_waveform_widget")
self._customise_bec_waveform_widget()
# Setup comboboxes for motor and signal selection
# FIXME after changing the filtering in the combobox
self._setup_signal_combobox()
# Setup motor indicator
self._setup_motor_indicator()
# Setup progress bar
self._setup_progress_bar()
# Add actions buttons
self._customise_buttons()
# Hook scaninfo updates
self.bec_dispatcher.connect_slot(self.scan_status_callback, MessageEndpoints.scan_status())
def show(self):
return self.ui.show()
##############################
############ SLOTS ###########
##############################
@Slot(dict, dict)
def scan_status_callback(self, content: dict, _) -> None:
"""This slot allows to enable/disable the UI critical components when a scan is running"""
if content["status"] in ["open"]:
self.enable_ui(False)
elif content["status"] in ["aborted", "halted", "closed"]:
self.enable_ui(True)
@Slot(tuple)
def move_to_center(self, move_request: tuple) -> None:
"""Move the selected motor to the center"""
motor = self.ui.device_combobox.currentText()
if move_request[0] in ["center", "center1", "center2"]:
pos = move_request[1]
self.dev.get(motor).move(float(pos), relative=False)
@Slot()
def reset_progress_bar(self) -> None:
"""Reset the progress bar"""
self.progress_bar.set_value(0)
self.progress_bar.set_minimum(0)
@Slot(dict, dict)
def update_progress_bar(self, content: dict, _) -> None:
"""Hook to update the progress bar
Args:
content: Content of the scan progress message.
metadata: Metadata of the message.
"""
if content["max_value"] == 0:
self.progress_bar.set_value(0)
return
self.progress_bar.set_maximum(content["max_value"])
self.progress_bar.set_value(content["value"])
@Slot()
def clear_queue(self) -> None:
"""Clear the scan queue"""
self.queue.request_queue_reset()
##############################
######## END OF SLOTS ########
##############################
def enable_ui(self, enable: bool) -> None:
"""Enable or disable the UI components"""
# Enable/disable motor and signal selection
self.ui.device_combobox_2.setEnabled(enable)
# Enable/disable DAP selection
self.ui.dap_combo_box.setEnabled(enable)
# Enable/disable Scan Button
# self.ui.scan_button.setEnabled(enable)
# Disable move to buttons in LMFitDialog
self.ui.findChild(LMFitDialog).set_actions_enabled(enable)
def _customise_buttons(self) -> None:
"""Add action buttons for the Action Control.
In addition, we are adding a callback to also clear the queue to the stop button
to ensure that upon clicking the button, no scans from another client may be queued
which would be confusing without the queue widget.
"""
fit_dialog = self.ui.findChild(LMFitDialog)
fit_dialog.active_action_list = ["center", "center1", "center2"]
fit_dialog.move_action.connect(self.move_to_center)
stop_button = self.ui.findChild(StopButton)
stop_button.button.setText("Stop and Clear Queue")
stop_button.button.clicked.connect(self.clear_queue)
def _customise_bec_waveform_widget(self) -> None:
"""Customise the BEC Waveform Widget, i.e. clear the toolbar"""
self.waveform.toolbar.clear()
def _setup_motor_indicator(self) -> None:
"""Setup the arrow item"""
self.waveform.waveform.tick_item.add_to_plot()
positioner_box = self.ui.findChild(PositionerGroup)
positioner_box.position_update.connect(self.waveform.waveform.tick_item.set_position)
self.waveform.waveform.tick_item.set_position(0)
def _setup_signal_combobox(self) -> None:
"""Setup signal selection"""
# FIXME after changing the filtering in the combobox
signals = [name for name in self.dev if isinstance(self.dev.get(name), BECSignal)]
self.ui.device_combobox_2.setCurrentText(signals[0])
self.ui.device_combobox_2.set_device_filter("Signal")
def _setup_progress_bar(self) -> None:
"""Setup progress bar"""
# FIXME once the BECScanProgressBar is implemented
self.progress_bar = self.ui.findChild(BECProgressBar, "bec_progress_bar")
self.progress_bar.set_value(0)
self.ui.bec_waveform_widget.new_scan.connect(self.reset_progress_bar)
self.bec_dispatcher.connect_slot(self.update_progress_bar, MessageEndpoints.scan_progress())
def close(self):
logger.info("Disconnecting", repr(self.bec_dispatcher))
self.bec_dispatcher.disconnect_all()
logger.info("Shutting down BEC Client", repr(self.client))
self.client.shutdown()
def main():
import sys
app = QApplication(sys.argv)
icon = QIcon()
icon.addFile(
os.path.join(MODULE_PATH, "assets", "app_icons", "alignment_1d.png"), size=QSize(48, 48)
)
app.setWindowIcon(icon)
window = Alignment1D()
window.show()
sys.exit(app.exec_())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -0,0 +1,615 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>mainWindow</class>
<widget class="QMainWindow" name="mainWindow">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>1611</width>
<height>1019</height>
</rect>
</property>
<property name="windowTitle">
<string>Alignment tool</string>
</property>
<widget class="QWidget" name="widget">
<layout class="QVBoxLayout" name="verticalLayout_2">
<item>
<widget class="QWidget" name="widget" native="true">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Fixed">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<layout class="QHBoxLayout" name="horizontalLayout_5">
<item>
<widget class="DarkModeButton" name="dark_mode_button"/>
</item>
<item>
<spacer name="horizontalSpacer_6">
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>40</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
<item>
<widget class="BECStatusBox" name="bec_status_box">
<property name="compact_view" stdset="0">
<bool>true</bool>
</property>
<property name="label" stdset="0">
<string>BEC Servers</string>
</property>
</widget>
</item>
<item>
<spacer name="horizontalSpacer_3">
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>40</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
<item>
<widget class="BECQueue" name="bec_queue">
<property name="compact_view" stdset="0">
<bool>true</bool>
</property>
</widget>
</item>
<item>
<spacer name="horizontalSpacer_5">
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>40</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
<item>
<widget class="QRadioButton" name="radioButton">
<property name="enabled">
<bool>false</bool>
</property>
<property name="text">
<string>SLS Light On</string>
</property>
<property name="checkable">
<bool>true</bool>
</property>
<property name="checked">
<bool>true</bool>
</property>
<property name="autoExclusive">
<bool>false</bool>
</property>
</widget>
</item>
<item>
<spacer name="horizontalSpacer_7">
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>40</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
<item>
<widget class="QRadioButton" name="radioButton_3">
<property name="enabled">
<bool>false</bool>
</property>
<property name="text">
<string>BEAMLINE Checks</string>
</property>
<property name="checkable">
<bool>true</bool>
</property>
<property name="checked">
<bool>true</bool>
</property>
<property name="autoExclusive">
<bool>false</bool>
</property>
</widget>
</item>
<item>
<spacer name="horizontalSpacer_4">
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>40</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
<item>
<widget class="StopButton" name="stop_button">
<property name="sizePolicy">
<sizepolicy hsizetype="MinimumExpanding" vsizetype="MinimumExpanding">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>200</width>
<height>40</height>
</size>
</property>
<property name="maximumSize">
<size>
<width>200</width>
<height>40</height>
</size>
</property>
</widget>
</item>
</layout>
</widget>
</item>
<item>
<widget class="BECProgressBar" name="bec_progress_bar">
<property name="minimumSize">
<size>
<width>0</width>
<height>25</height>
</size>
</property>
<property name="maximumSize">
<size>
<width>16777215</width>
<height>25</height>
</size>
</property>
</widget>
</item>
<item>
<widget class="QTabWidget" name="tabWidget">
<property name="sizePolicy">
<sizepolicy hsizetype="Expanding" vsizetype="Expanding">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="currentIndex">
<number>0</number>
</property>
<widget class="QWidget" name="ControlTab">
<attribute name="title">
<string>Alignment Control</string>
</attribute>
<layout class="QHBoxLayout" name="horizontalLayout_6">
<item>
<widget class="QWidget" name="widget_4" native="true">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Preferred">
<horstretch>1</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<layout class="QVBoxLayout" name="verticalLayout_3">
<item>
<widget class="ScanControl" name="scan_control">
<property name="current_scan" stdset="0">
<string>line_scan</string>
</property>
<property name="hide_arg_box" stdset="0">
<bool>false</bool>
</property>
<property name="hide_scan_selection_combobox" stdset="0">
<bool>true</bool>
</property>
<property name="hide_add_remove_buttons" stdset="0">
<bool>true</bool>
</property>
</widget>
</item>
<item>
<widget class="PositionerGroup" name="positioner_group"/>
</item>
<item>
<spacer name="verticalSpacer">
<property name="orientation">
<enum>Qt::Orientation::Vertical</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>20</width>
<height>40</height>
</size>
</property>
</spacer>
</item>
</layout>
</widget>
</item>
<item>
<widget class="QWidget" name="widget_3" native="true">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Preferred">
<horstretch>4</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QWidget" name="widget_2" native="true">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Fixed">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<layout class="QHBoxLayout" name="horizontalLayout_3">
<item>
<widget class="QLabel" name="label_2">
<property name="font">
<font/>
</property>
<property name="text">
<string>Monitor</string>
</property>
</widget>
</item>
<item>
<widget class="DeviceComboBox" name="device_combobox_2"/>
</item>
<item>
<widget class="QLabel" name="label_3">
<property name="font">
<font/>
</property>
<property name="text">
<string>LMFit Model</string>
</property>
</widget>
</item>
<item>
<widget class="DapComboBox" name="dap_combo_box"/>
</item>
<item>
<widget class="QLabel" name="label">
<property name="text">
<string>Enable ROI</string>
</property>
</widget>
</item>
<item>
<widget class="ToggleSwitch" name="toggle_switch">
<property name="sizePolicy">
<sizepolicy hsizetype="Expanding" vsizetype="Expanding">
<horstretch>3</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="toolTip">
<string>Activate linear region select for LMFit</string>
</property>
<property name="layoutDirection">
<enum>Qt::LayoutDirection::LeftToRight</enum>
</property>
<property name="checked" stdset="0">
<bool>false</bool>
</property>
</widget>
</item>
<item>
<spacer name="horizontalSpacer_8">
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>40</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
</layout>
</widget>
</item>
<item>
<widget class="BECWaveformWidget" name="bec_waveform_widget">
<property name="sizePolicy">
<sizepolicy hsizetype="Expanding" vsizetype="Expanding">
<horstretch>0</horstretch>
<verstretch>1</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>600</width>
<height>450</height>
</size>
</property>
<property name="clear_curves_on_plot_update" stdset="0">
<bool>true</bool>
</property>
</widget>
</item>
<item>
<widget class="LMFitDialog" name="lm_fit_dialog">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Minimum">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>0</width>
<height>190</height>
</size>
</property>
<property name="always_show_latest" stdset="0">
<bool>true</bool>
</property>
<property name="hide_curve_selection" stdset="0">
<bool>true</bool>
</property>
<property name="hide_summary" stdset="0">
<bool>true</bool>
</property>
</widget>
</item>
</layout>
</widget>
</item>
</layout>
</widget>
<widget class="QWidget" name="tab_2">
<attribute name="title">
<string>Logbook</string>
</attribute>
<layout class="QHBoxLayout" name="horizontalLayout_4">
<property name="leftMargin">
<number>2</number>
</property>
<property name="topMargin">
<number>2</number>
</property>
<property name="rightMargin">
<number>2</number>
</property>
<property name="bottomMargin">
<number>2</number>
</property>
<item>
<widget class="WebsiteWidget" name="website_widget">
<property name="url" stdset="0">
<string>https://scilog.psi.ch/login</string>
</property>
</widget>
</item>
</layout>
</widget>
</widget>
</item>
</layout>
</widget>
</widget>
<customwidgets>
<customwidget>
<class>DapComboBox</class>
<extends>QWidget</extends>
<header>dap_combo_box</header>
</customwidget>
<customwidget>
<class>StopButton</class>
<extends>QWidget</extends>
<header>stop_button</header>
</customwidget>
<customwidget>
<class>WebsiteWidget</class>
<extends>QWidget</extends>
<header>website_widget</header>
</customwidget>
<customwidget>
<class>BECQueue</class>
<extends>QWidget</extends>
<header>bec_queue</header>
</customwidget>
<customwidget>
<class>ScanControl</class>
<extends>QWidget</extends>
<header>scan_control</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
<customwidget>
<class>BECProgressBar</class>
<extends>QWidget</extends>
<header>bec_progress_bar</header>
</customwidget>
<customwidget>
<class>DarkModeButton</class>
<extends>QWidget</extends>
<header>dark_mode_button</header>
</customwidget>
<customwidget>
<class>PositionerGroup</class>
<extends>QWidget</extends>
<header>positioner_group</header>
</customwidget>
<customwidget>
<class>BECWaveformWidget</class>
<extends>QWidget</extends>
<header>bec_waveform_widget</header>
</customwidget>
<customwidget>
<class>DeviceComboBox</class>
<extends>QComboBox</extends>
<header>device_combobox</header>
</customwidget>
<customwidget>
<class>LMFitDialog</class>
<extends>QWidget</extends>
<header>lm_fit_dialog</header>
</customwidget>
<customwidget>
<class>BECStatusBox</class>
<extends>QWidget</extends>
<header>bec_status_box</header>
</customwidget>
</customwidgets>
<resources/>
<connections>
<connection>
<sender>toggle_switch</sender>
<signal>enabled(bool)</signal>
<receiver>bec_waveform_widget</receiver>
<slot>toogle_roi_select(bool)</slot>
<hints>
<hint type="sourcelabel">
<x>1042</x>
<y>212</y>
</hint>
<hint type="destinationlabel">
<x>1416</x>
<y>322</y>
</hint>
</hints>
</connection>
<connection>
<sender>bec_waveform_widget</sender>
<signal>dap_summary_update(QVariantMap,QVariantMap)</signal>
<receiver>lm_fit_dialog</receiver>
<slot>update_summary_tree(QVariantMap,QVariantMap)</slot>
<hints>
<hint type="sourcelabel">
<x>1099</x>
<y>258</y>
</hint>
<hint type="destinationlabel">
<x>1157</x>
<y>929</y>
</hint>
</hints>
</connection>
<connection>
<sender>device_combobox_2</sender>
<signal>currentTextChanged(QString)</signal>
<receiver>bec_waveform_widget</receiver>
<slot>plot(QString)</slot>
<hints>
<hint type="sourcelabel">
<x>577</x>
<y>215</y>
</hint>
<hint type="destinationlabel">
<x>1416</x>
<y>427</y>
</hint>
</hints>
</connection>
<connection>
<sender>device_combobox_2</sender>
<signal>currentTextChanged(QString)</signal>
<receiver>dap_combo_box</receiver>
<slot>select_y_axis(QString)</slot>
<hints>
<hint type="sourcelabel">
<x>577</x>
<y>215</y>
</hint>
<hint type="destinationlabel">
<x>909</x>
<y>215</y>
</hint>
</hints>
</connection>
<connection>
<sender>dap_combo_box</sender>
<signal>new_dap_config(QString,QString,QString)</signal>
<receiver>bec_waveform_widget</receiver>
<slot>add_dap(QString,QString,QString)</slot>
<hints>
<hint type="sourcelabel">
<x>909</x>
<y>215</y>
</hint>
<hint type="destinationlabel">
<x>1416</x>
<y>447</y>
</hint>
</hints>
</connection>
<connection>
<sender>scan_control</sender>
<signal>device_selected(QString)</signal>
<receiver>positioner_group</receiver>
<slot>set_positioners(QString)</slot>
<hints>
<hint type="sourcelabel">
<x>230</x>
<y>306</y>
</hint>
<hint type="destinationlabel">
<x>187</x>
<y>926</y>
</hint>
</hints>
</connection>
<connection>
<sender>scan_control</sender>
<signal>device_selected(QString)</signal>
<receiver>bec_waveform_widget</receiver>
<slot>set_x(QString)</slot>
<hints>
<hint type="sourcelabel">
<x>187</x>
<y>356</y>
</hint>
<hint type="destinationlabel">
<x>972</x>
<y>509</y>
</hint>
</hints>
</connection>
<connection>
<sender>scan_control</sender>
<signal>device_selected(QString)</signal>
<receiver>dap_combo_box</receiver>
<slot>select_x_axis(QString)</slot>
<hints>
<hint type="sourcelabel">
<x>187</x>
<y>356</y>
</hint>
<hint type="destinationlabel">
<x>794</x>
<y>202</y>
</hint>
</hints>
</connection>
</connections>
</ui>

View File

@@ -0,0 +1,84 @@
"""
Launcher for BEC GUI Applications
Application must be located in bec_widgets/applications ;
in order for the launcher to find the application, it has to be put in
a subdirectory with the same name as the main Python module:
/bec_widgets/applications
├── alignment
│ └── alignment_1d
│ └── alignment_1d.py
├── other_app
└── other_app.py
The tree above would contain 2 applications, alignment_1d and other_app.
The Python module for the application must have `if __name__ == "__main__":`
in order for the launcher to execute it (it is run with `python -m`).
"""
import argparse
import os
import sys
MODULE_PATH = os.path.dirname(__file__)
def find_apps(base_dir: str) -> list[str]:
matching_modules = []
for root, dirs, files in os.walk(base_dir):
parent_dir = os.path.basename(root)
for file in files:
if file.endswith(".py") and file != "__init__.py":
file_name_without_ext = os.path.splitext(file)[0]
if file_name_without_ext == parent_dir:
rel_path = os.path.relpath(root, base_dir)
module_path = rel_path.replace(os.sep, ".")
module_name = f"{module_path}.{file_name_without_ext}"
matching_modules.append((file_name_without_ext, module_name))
return matching_modules
def main():
parser = argparse.ArgumentParser(description="BEC application launcher")
parser.add_argument("-m", "--module", type=str, help="The module to run (string argument).")
# Add a positional argument for the module, which acts as a fallback if -m is not provided
parser.add_argument(
"positional_module",
nargs="?", # This makes the positional argument optional
help="Positional argument that is treated as module if -m is not specified.",
)
args = parser.parse_args()
# If the -m/--module is not provided, fallback to the positional argument
module = args.module if args.module else args.positional_module
if module:
for app_name, app_module in find_apps(MODULE_PATH):
if module in (app_name, app_module):
print("Starting:", app_name)
python_executable = sys.executable
# Replace the current process with the new Python module
os.execvp(
python_executable,
[python_executable, "-m", f"bec_widgets.applications.{app_module}"],
)
print(f"Error: cannot find application {module}")
# display list of apps
print("Available applications:")
for app, _ in find_apps(MODULE_PATH):
print(f" - {app}")
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -1,23 +0,0 @@
from __future__ import annotations
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
def dock_area(object_name: str | None = None) -> BECDockArea:
_dock_area = BECDockArea(object_name=object_name, root_widget=True)
return _dock_area
def auto_update_dock_area(object_name: str | None = None) -> AutoUpdates:
"""
Create a dock area with auto update enabled.
Args:
object_name(str): The name of the dock area.
Returns:
BECDockArea: The created dock area.
"""
_auto_update = AutoUpdates(object_name=object_name)
return _auto_update

View File

@@ -1,517 +0,0 @@
from __future__ import annotations
import os
import xml.etree.ElementTree as ET
from typing import TYPE_CHECKING, Callable
from bec_lib.logger import bec_logger
from qtpy.QtCore import Qt, Signal # type: ignore
from qtpy.QtGui import QPainter, QPainterPath, QPixmap
from qtpy.QtWidgets import (
QApplication,
QComboBox,
QFileDialog,
QHBoxLayout,
QLabel,
QPushButton,
QSizePolicy,
QSpacerItem,
QWidget,
)
import bec_widgets
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.name_utils import pascal_to_snake
from bec_widgets.utils.plugin_utils import get_plugin_auto_updates
from bec_widgets.utils.round_frame import RoundedFrame
from bec_widgets.utils.toolbar import ModularToolBar
from bec_widgets.utils.ui_loader import UILoader
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow, UILaunchWindow
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtCore import QObject
from bec_widgets.utils.bec_widget import BECWidget
logger = bec_logger.logger
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
class LaunchTile(RoundedFrame):
open_signal = Signal()
def __init__(
self,
parent: QObject | None = None,
icon_path: str | None = None,
top_label: str | None = None,
main_label: str | None = None,
description: str | None = None,
show_selector: bool = False,
):
super().__init__(parent=parent, orientation="vertical")
self.icon_label = QLabel(parent=self)
self.icon_label.setFixedSize(100, 100)
self.icon_label.setScaledContents(True)
pixmap = QPixmap(icon_path)
if not pixmap.isNull():
size = 100
circular_pixmap = QPixmap(size, size)
circular_pixmap.fill(Qt.transparent)
painter = QPainter(circular_pixmap)
painter.setRenderHints(QPainter.Antialiasing, True)
path = QPainterPath()
path.addEllipse(0, 0, size, size)
painter.setClipPath(path)
pixmap = pixmap.scaled(size, size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
painter.drawPixmap(0, 0, pixmap)
painter.end()
self.icon_label.setPixmap(circular_pixmap)
self.layout.addWidget(self.icon_label, alignment=Qt.AlignCenter)
# Top label
self.top_label = QLabel(top_label.upper())
font_top = self.top_label.font()
font_top.setPointSize(10)
self.top_label.setFont(font_top)
self.layout.addWidget(self.top_label, alignment=Qt.AlignCenter)
# Main label
self.main_label = QLabel(main_label)
font_main = self.main_label.font()
font_main.setPointSize(14)
font_main.setBold(True)
self.main_label.setFont(font_main)
self.main_label.setWordWrap(True)
self.main_label.setAlignment(Qt.AlignCenter)
self.layout.addWidget(self.main_label)
self.spacer_top = QSpacerItem(0, 10, QSizePolicy.Fixed, QSizePolicy.Fixed)
self.layout.addItem(self.spacer_top)
# Description
self.description_label = QLabel(description)
self.description_label.setWordWrap(True)
self.description_label.setAlignment(Qt.AlignCenter)
self.layout.addWidget(self.description_label)
# Selector
if show_selector:
self.selector = QComboBox(self)
self.layout.addWidget(self.selector)
else:
self.selector = None
self.spacer_bottom = QSpacerItem(0, 0, QSizePolicy.Fixed, QSizePolicy.Expanding)
self.layout.addItem(self.spacer_bottom)
# Action button
self.action_button = QPushButton("Open")
self.action_button.setStyleSheet(
"""
QPushButton {
background-color: #007AFF;
border: none;
padding: 8px 16px;
color: white;
border-radius: 6px;
font-weight: bold;
}
QPushButton:hover {
background-color: #005BB5;
}
"""
)
self.layout.addWidget(self.action_button, alignment=Qt.AlignCenter)
class LaunchWindow(BECMainWindow):
RPC = True
TILE_SIZE = (250, 300)
USER_ACCESS = ["show_launcher", "hide_launcher"]
def __init__(
self, parent=None, gui_id: str = None, window_title="BEC Launcher", *args, **kwargs
):
super().__init__(parent=parent, gui_id=gui_id, window_title=window_title, **kwargs)
self.app = QApplication.instance()
self.tiles: dict[str, LaunchTile] = {}
# Toolbar
self.dark_mode_button = DarkModeButton(parent=self, toolbar=True)
self.toolbar = ModularToolBar(parent=self)
self.addToolBar(Qt.TopToolBarArea, self.toolbar)
self.spacer = QWidget(self)
self.spacer.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.toolbar.addWidget(self.spacer)
self.toolbar.addWidget(self.dark_mode_button)
# Main Widget
self.central_widget = QWidget(self)
self.central_widget.layout = QHBoxLayout(self.central_widget)
self.setCentralWidget(self.central_widget)
self.register_tile(
name="dock_area",
icon_path=os.path.join(MODULE_PATH, "assets", "app_icons", "bec_widgets_icon.png"),
top_label="Get started",
main_label="BEC Dock Area",
description="Highly flexible and customizable dock area application with modular widgets.",
action_button=lambda: self.launch("dock_area"),
show_selector=False,
)
self.available_auto_updates: dict[str, type[AutoUpdates]] = (
self._update_available_auto_updates()
)
self.register_tile(
name="auto_update",
icon_path=os.path.join(MODULE_PATH, "assets", "app_icons", "auto_update.png"),
top_label="Get automated",
main_label="BEC Auto Update Dock Area",
description="Dock area with auto update functionality for BEC widgets plotting.",
action_button=self._open_auto_update,
show_selector=True,
selector_items=list(self.available_auto_updates.keys()) + ["Default"],
)
self.register_tile(
name="custom_ui_file",
icon_path=os.path.join(MODULE_PATH, "assets", "app_icons", "ui_loader_tile.png"),
top_label="Get customized",
main_label="Launch Custom UI File",
description="GUI application with custom UI file.",
action_button=self._open_custom_ui_file,
show_selector=False,
)
# plugin widgets
self.available_widgets: dict[str, BECWidget] = get_all_plugin_widgets()
if self.available_widgets:
plugin_repo_name = next(iter(self.available_widgets.values())).__module__.split(".")[0]
plugin_repo_name = plugin_repo_name.removesuffix("_bec").upper()
self.register_tile(
name="widget",
icon_path=os.path.join(
MODULE_PATH, "assets", "app_icons", "widget_launch_tile.png"
),
top_label="Get quickly started",
main_label=f"Launch a {plugin_repo_name} Widget",
description=f"GUI application with one widget from the {plugin_repo_name} repository.",
action_button=self._open_widget,
show_selector=True,
selector_items=list(self.available_widgets.keys()),
)
self._update_theme()
self.register = RPCRegister()
self.register.callbacks.append(self._turn_off_the_lights)
self.register.broadcast()
def register_tile(
self,
name: str,
icon_path: str | None = None,
top_label: str | None = None,
main_label: str | None = None,
description: str | None = None,
action_button: Callable | None = None,
show_selector: bool = False,
selector_items: list[str] | None = None,
):
"""
Register a tile in the launcher window.
Args:
name(str): The name of the tile.
icon_path(str): The path to the icon.
top_label(str): The top label of the tile.
main_label(str): The main label of the tile.
description(str): The description of the tile.
action_button(callable): The action to be performed when the button is clicked.
show_selector(bool): Whether to show a selector or not.
selector_items(list[str]): The items to be shown in the selector.
"""
tile = LaunchTile(
icon_path=icon_path,
top_label=top_label,
main_label=main_label,
description=description,
show_selector=show_selector,
)
tile.setFixedSize(*self.TILE_SIZE)
if action_button:
tile.action_button.clicked.connect(action_button)
if show_selector and selector_items:
tile.selector.addItems(selector_items)
self.central_widget.layout.addWidget(tile)
self.tiles[name] = tile
def launch(
self,
launch_script: str,
name: str | None = None,
geometry: tuple[int, int, int, int] | None = None,
**kwargs,
) -> QWidget | None:
"""Launch the specified script. If the launch script creates a QWidget, it will be
embedded in a BECMainWindow. If the launch script creates a BECMainWindow, it will be shown
as a separate window.
Args:
launch_script(str): The name of the script to be launched.
name(str): The name of the dock area.
geometry(tuple): The geometry parameters to be passed to the dock area.
Returns:
QWidget: The created dock area.
"""
from bec_widgets.applications import bw_launch
with RPCRegister.delayed_broadcast() as rpc_register:
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(BECDockArea)
if name is not None:
if name in existing_dock_areas:
raise ValueError(
f"Name {name} must be unique for dock areas, but already exists: {existing_dock_areas}."
)
WidgetContainerUtils.raise_for_invalid_name(name)
else:
name = "dock_area"
name = WidgetContainerUtils.generate_unique_name(name, existing_dock_areas)
if launch_script is None:
launch_script = "dock_area"
if not isinstance(launch_script, str):
raise ValueError(f"Launch script must be a string, but got {type(launch_script)}.")
if launch_script == "custom_ui_file":
ui_file = kwargs.pop("ui_file", None)
if not ui_file:
return None
return self._launch_custom_ui_file(ui_file)
if launch_script == "auto_update":
auto_update = kwargs.pop("auto_update", None)
return self._launch_auto_update(auto_update)
if launch_script == "widget":
widget = kwargs.pop("widget", None)
if widget is None:
raise ValueError("Widget name must be provided.")
return self._launch_widget(widget)
launch = getattr(bw_launch, launch_script, None)
if launch is None:
raise ValueError(f"Launch script {launch_script} not found.")
result_widget = launch(name)
result_widget.resize(result_widget.minimumSizeHint())
# TODO Should we simply use the specified name as title here?
result_widget.window().setWindowTitle(f"BEC - {name}")
logger.info(f"Created new dock area: {name}")
if geometry is not None:
result_widget.setGeometry(*geometry)
if isinstance(result_widget, BECMainWindow):
result_widget.show()
else:
window = BECMainWindow()
window.setCentralWidget(result_widget)
window.setWindowTitle(f"BEC - {result_widget.objectName()}")
window.show()
return result_widget
def _launch_custom_ui_file(self, ui_file: str | None) -> BECMainWindow:
# Load the custom UI file
if ui_file is None:
raise ValueError("UI file must be provided for custom UI file launch.")
filename = os.path.basename(ui_file).split(".")[0]
WidgetContainerUtils.raise_for_invalid_name(filename)
tree = ET.parse(ui_file)
root = tree.getroot()
# Check if the top-level widget is a QMainWindow
widget = root.find("widget")
if widget is None:
raise ValueError("No widget found in the UI file.")
if widget.attrib.get("class") == "QMainWindow":
raise ValueError(
"Loading a QMainWindow from a UI file is currently not supported. "
"If you need this, please contact the BEC team or create a ticket on gitlab.psi.ch/bec/bec_widgets."
)
window = UILaunchWindow(object_name=filename)
QApplication.processEvents()
result_widget = UILoader(window).loader(ui_file)
window.setCentralWidget(result_widget)
window.setWindowTitle(f"BEC - {window.object_name}")
window.show()
logger.info(f"Object name of new instance: {result_widget.objectName()}, {window.gui_id}")
return window
def _launch_auto_update(self, auto_update: str) -> AutoUpdates:
if auto_update in self.available_auto_updates:
auto_update_cls = self.available_auto_updates[auto_update]
window = auto_update_cls()
else:
auto_update = "auto_updates"
window = AutoUpdates()
window.resize(window.minimumSizeHint())
QApplication.processEvents()
window.setWindowTitle(f"BEC - {window.objectName()}")
window.show()
return window
def _launch_widget(self, widget: type[BECWidget]) -> QWidget:
name = pascal_to_snake(widget.__name__)
WidgetContainerUtils.raise_for_invalid_name(name)
window = BECMainWindow()
widget_instance = widget(root_widget=True, object_name=name)
assert isinstance(widget_instance, QWidget)
QApplication.processEvents()
window.setCentralWidget(widget_instance)
window.resize(window.minimumSizeHint())
window.setWindowTitle(f"BEC - {widget_instance.objectName()}")
window.show()
return window
def apply_theme(self, theme: str):
"""
Change the theme of the application.
"""
for tile in self.tiles.values():
tile.apply_theme(theme)
super().apply_theme(theme)
def _open_auto_update(self):
"""
Open the auto update window.
"""
if self.tiles["auto_update"].selector is None:
auto_update = None
else:
auto_update = self.tiles["auto_update"].selector.currentText()
if auto_update == "Default":
auto_update = None
return self.launch("auto_update", auto_update=auto_update)
def _open_widget(self):
"""
Open a widget from the available widgets.
"""
if self.tiles["widget"].selector is None:
return
widget = self.tiles["widget"].selector.currentText()
if widget not in self.available_widgets:
raise ValueError(f"Widget {widget} not found in available widgets.")
return self.launch("widget", widget=self.available_widgets[widget])
@SafeSlot(popup_error=True)
def _open_custom_ui_file(self):
"""
Open a file dialog to select a custom UI file and launch it.
"""
ui_file, _ = QFileDialog.getOpenFileName(
self, "Select UI File", "", "UI Files (*.ui);;All Files (*)"
)
self.launch("custom_ui_file", ui_file=ui_file)
@staticmethod
def _update_available_auto_updates() -> dict[str, type[AutoUpdates]]:
"""
Load all available auto updates from the plugin repository.
"""
try:
auto_updates = get_plugin_auto_updates()
logger.info(f"Available auto updates: {auto_updates.keys()}")
except Exception as exc:
logger.error(f"Failed to load auto updates: {exc}")
return {}
return auto_updates
def show_launcher(self):
"""
Show the launcher window.
"""
self.show()
def hide_launcher(self):
"""
Hide the launcher window.
"""
self.hide()
def showEvent(self, event):
super().showEvent(event)
self.setFixedSize(self.size())
def _launcher_is_last_widget(self, connections: dict) -> bool:
"""
Check if the launcher is the last widget in the application.
"""
remaining_connections = [
connection for connection in connections.values() if connection.parent_id != self.gui_id
]
return len(remaining_connections) <= 1
def _turn_off_the_lights(self, connections: dict):
"""
If there is only one connection remaining, it is the launcher, so we show it.
Once the launcher is closed as the last window, we quit the application.
"""
if self._launcher_is_last_widget(connections):
self.show()
self.activateWindow()
self.raise_()
if self.app:
self.app.setQuitOnLastWindowClosed(True) # type: ignore
return
self.hide()
if self.app:
self.app.setQuitOnLastWindowClosed(False) # type: ignore
def closeEvent(self, event):
"""
Close the launcher window.
"""
connections = self.register.list_all_connections()
if self._launcher_is_last_widget(connections):
event.accept()
return
event.ignore()
self.hide()
if __name__ == "__main__":
import sys
app = QApplication(sys.argv)
launcher = LaunchWindow()
launcher.show()
sys.exit(app.exec())

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.1 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.1 MiB

View File

@@ -0,0 +1 @@
from .client import *

View File

@@ -0,0 +1,184 @@
from __future__ import annotations
import threading
from queue import Queue
from typing import TYPE_CHECKING
from pydantic import BaseModel
if TYPE_CHECKING:
from .client import BECDockArea, BECFigure
class ScanInfo(BaseModel):
scan_id: str
scan_number: int
scan_name: str
scan_report_devices: list
monitored_devices: list
status: str
model_config: dict = {"validate_assignment": True}
class AutoUpdates:
create_default_dock: bool = False
enabled: bool = False
dock_name: str = None
def __init__(self, gui: BECDockArea):
self.gui = gui
self.msg_queue = Queue()
self.auto_update_thread = None
self._shutdown_sentinel = object()
self.start()
def start(self):
"""
Start the auto update thread.
"""
self.auto_update_thread = threading.Thread(target=self.process_queue)
self.auto_update_thread.start()
def start_default_dock(self):
"""
Create a default dock for the auto updates.
"""
dock = self.gui.add_dock("default_figure")
dock.add_widget("BECFigure")
self.dock_name = "default_figure"
@staticmethod
def get_scan_info(msg) -> ScanInfo:
"""
Update the script with the given data.
"""
info = msg.info
status = msg.status
scan_id = msg.scan_id
scan_number = info.get("scan_number", 0)
scan_name = info.get("scan_name", "Unknown")
scan_report_devices = info.get("scan_report_devices", [])
monitored_devices = info.get("readout_priority", {}).get("monitored", [])
monitored_devices = [dev for dev in monitored_devices if dev not in scan_report_devices]
return ScanInfo(
scan_id=scan_id,
scan_number=scan_number,
scan_name=scan_name,
scan_report_devices=scan_report_devices,
monitored_devices=monitored_devices,
status=status,
)
def get_default_figure(self) -> BECFigure | None:
"""
Get the default figure from the GUI.
"""
dock = self.gui.panels.get(self.dock_name, [])
if not dock:
return None
widgets = dock.widget_list
if not widgets:
return None
return widgets[0]
def run(self, msg):
"""
Run the update function if enabled.
"""
if not self.enabled:
return
if msg.status != "open":
return
info = self.get_scan_info(msg)
self.handler(info)
def process_queue(self):
"""
Process the message queue.
"""
while True:
msg = self.msg_queue.get()
if msg is self._shutdown_sentinel:
break
self.run(msg)
@staticmethod
def get_selected_device(monitored_devices, selected_device):
"""
Get the selected device for the plot. If no device is selected, the first
device in the monitored devices list is selected.
"""
if selected_device:
return selected_device
if len(monitored_devices) > 0:
sel_device = monitored_devices[0]
return sel_device
return None
def handler(self, info: ScanInfo) -> None:
"""
Default update function.
"""
if info.scan_name == "line_scan" and info.scan_report_devices:
self.simple_line_scan(info)
return
if info.scan_name == "grid_scan" and info.scan_report_devices:
self.simple_grid_scan(info)
return
if info.scan_report_devices:
self.best_effort(info)
return
def simple_line_scan(self, info: ScanInfo) -> None:
"""
Simple line scan.
"""
fig = self.get_default_figure()
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = self.get_selected_device(info.monitored_devices, self.gui.selected_device)
if not dev_y:
return
fig.clear_all()
plt = fig.plot(x_name=dev_x, y_name=dev_y, label=f"Scan {info.scan_number} - {dev_y}")
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def simple_grid_scan(self, info: ScanInfo) -> None:
"""
Simple grid scan.
"""
fig = self.get_default_figure()
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = info.scan_report_devices[1]
dev_z = self.get_selected_device(info.monitored_devices, self.gui.selected_device)
fig.clear_all()
plt = fig.plot(
x_name=dev_x, y_name=dev_y, z_name=dev_z, label=f"Scan {info.scan_number} - {dev_z}"
)
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def best_effort(self, info: ScanInfo) -> None:
"""
Best effort scan.
"""
fig = self.get_default_figure()
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = self.get_selected_device(info.monitored_devices, self.gui.selected_device)
if not dev_y:
return
fig.clear_all()
plt = fig.plot(x_name=dev_x, y_name=dev_y, label=f"Scan {info.scan_number} - {dev_y}")
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def shutdown(self):
"""
Shutdown the auto update thread.
"""
self.msg_queue.put(self._shutdown_sentinel)
if self.auto_update_thread:
self.auto_update_thread.join()

File diff suppressed because it is too large Load Diff

View File

@@ -1,53 +1,65 @@
"""Client utilities for the BEC GUI."""
from __future__ import annotations
import importlib
import importlib.metadata as imd
import json
import os
import select
import subprocess
import threading
import time
from contextlib import contextmanager
from threading import Lock
from typing import TYPE_CHECKING, Literal, TypeAlias, cast
import uuid
from functools import wraps
from typing import TYPE_CHECKING
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import_from
from rich.console import Console
from rich.table import Table
from bec_lib.utils.import_utils import isinstance_based_on_class_name, lazy_import, lazy_import_from
import bec_widgets.cli.client as client
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCReference
from bec_widgets.utils.serialization import register_serializer_extension
from bec_widgets.cli.auto_updates import AutoUpdates
if TYPE_CHECKING: # pragma: no cover
from bec_lib.messages import GUIRegistryStateMessage
else:
GUIRegistryStateMessage = lazy_import_from("bec_lib.messages", "GUIRegistryStateMessage")
if TYPE_CHECKING:
from bec_lib.device import DeviceBase
messages = lazy_import("bec_lib.messages")
# from bec_lib.connector import MessageObject
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispatcher",))
logger = bec_logger.logger
IGNORE_WIDGETS = ["LaunchWindow"]
RegistryState: TypeAlias = dict[
Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"],
str | bool | dict,
]
# pylint: disable=redefined-outer-scope
def _filter_output(output: str) -> str:
def rpc_call(func):
"""
Filter out the output from the process.
A decorator for calling a function on the server.
Args:
func: The function to call.
Returns:
The result of the function call.
"""
if "IMKClient" in output:
# only relevant on macOS
# see https://discussions.apple.com/thread/255761734?sortBy=rank
return ""
return output
@wraps(func)
def wrapper(self, *args, **kwargs):
# we could rely on a strict type check here, but this is more flexible
# moreover, it would anyway crash for objects...
out = []
for arg in args:
if hasattr(arg, "name"):
arg = arg.name
out.append(arg)
args = tuple(out)
for key, val in kwargs.items():
if hasattr(val, "name"):
kwargs[key] = val.name
if not self.gui_is_alive():
raise RuntimeError("GUI is not alive")
return self._run_rpc(func.__name__, *args, **kwargs)
return wrapper
def _get_output(process, logger) -> None:
@@ -63,7 +75,6 @@ def _get_output(process, logger) -> None:
if stream in readylist:
buf.append(stream.read(4096))
output, _, remaining = "".join(buf).rpartition("\n")
output = _filter_output(output)
if output:
log_func[stream](output)
buf.clear()
@@ -72,13 +83,7 @@ def _get_output(process, logger) -> None:
logger.error(f"Error reading process output: {str(e)}")
def _start_plot_process(
gui_id: str,
gui_class_id: str,
config: dict | str,
gui_class: str = "dock_area",
logger=None, # FIXME change gui_class back to "launcher" later
) -> tuple[subprocess.Popen[str], threading.Thread | None]:
def _start_plot_process(gui_id: str, gui_class: type, config: dict | str, logger=None) -> None:
"""
Start the plot in a new process.
@@ -87,20 +92,11 @@ def _start_plot_process(
process will not be captured.
"""
# pylint: disable=subprocess-run-check
command = [
"bec-gui-server",
"--id",
gui_id,
"--gui_class",
gui_class,
"--gui_class_id",
gui_class_id,
"--hide",
]
command = ["bec-gui-server", "--id", gui_id, "--gui_class", gui_class.__name__]
if config:
if isinstance(config, dict):
config = json.dumps(config)
command.extend(["--config", str(config)])
command.extend(["--config", config])
env_dict = os.environ.copy()
env_dict["PYTHONUNBUFFERED"] = "1"
@@ -130,402 +126,215 @@ def _start_plot_process(
return process, process_output_processing_thread
class RepeatTimer(threading.Timer):
"""RepeatTimer class."""
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
# pylint: disable=protected-access
@contextmanager
def wait_for_server(client: BECGuiClient):
"""Context manager to wait for the server to start."""
timeout = client._startup_timeout
if not timeout:
if client._gui_is_alive():
# there is hope, let's wait a bit
timeout = 1
else:
raise RuntimeError("GUI is not alive")
try:
if client._gui_started_event.wait(timeout=timeout):
client._gui_started_timer.cancel()
client._gui_started_timer.join()
else:
raise TimeoutError("Could not connect to GUI server")
finally:
# after initial waiting period, do not wait so much any more
# (only relevant if GUI didn't start)
client._startup_timeout = 0
yield
class WidgetNameSpace:
def __repr__(self):
console = Console()
table = Table(title="Available widgets for BEC CLI usage")
table.add_column("Widget Name", justify="left", style="magenta")
table.add_column("Description", justify="left")
for attr, value in self.__dict__.items():
docs = value.__doc__
docs = docs if docs else "No description available"
table.add_row(attr, docs)
console.print(table)
return ""
class AvailableWidgetsNamespace:
"""Namespace for available widgets in the BEC GUI."""
def __init__(self):
for widget in client.Widgets:
name = widget.value
if name in IGNORE_WIDGETS:
continue
setattr(self, name, name)
def __repr__(self):
console = Console()
table = Table(title="Available widgets for BEC CLI usage")
table.add_column("Widget Name", justify="left", style="magenta")
table.add_column("Description", justify="left")
for attr_name, _ in self.__dict__.items():
docs = getattr(client, attr_name).__doc__
docs = docs if docs else "No description available"
table.add_row(attr_name, docs if len(docs.strip()) > 0 else "No description available")
console.print(table)
return ""
class BECGuiClient(RPCBase):
"""BEC GUI client class. Container for GUI applications within Python."""
class BECGuiClientMixin:
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._lock = Lock()
self._anchor_widget = "launcher"
self._killed = False
self._top_level: dict[str, RPCReference] = {}
self._startup_timeout = 0
self._gui_started_timer = None
self._gui_started_event = threading.Event()
self._process = None
self._process_output_processing_thread = None
self._server_registry: dict[str, RegistryState] = {}
self._ipython_registry: dict[str, RPCReference] = {}
self.available_widgets = AvailableWidgetsNamespace()
register_serializer_extension()
self.auto_updates = self._get_update_script()
self._target_endpoint = MessageEndpoints.scan_status()
self._selected_device = None
####################
#### Client API ####
####################
def _get_update_script(self) -> AutoUpdates | None:
eps = imd.entry_points(group="bec.widgets.auto_updates")
for ep in eps:
if ep.name == "plugin_widgets_update":
try:
spec = importlib.util.find_spec(ep.module)
# if the module is not found, we skip it
if spec is None:
continue
return ep.load()(gui=self)
except Exception as e:
logger.error(f"Error loading auto update script from plugin: {str(e)}")
return None
@property
def launcher(self) -> RPCBase:
"""The launcher object."""
return RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self, object_name="launcher")
def selected_device(self):
"""
Selected device for the plot.
"""
return self._selected_device
def connect_to_gui_server(self, gui_id: str) -> None:
"""Connect to a GUI server"""
# Unregister the old callback
self._client.connector.unregister(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
self._gui_id = gui_id
@selected_device.setter
def selected_device(self, device: str | DeviceBase):
if isinstance_based_on_class_name(device, "bec_lib.device.DeviceBase"):
self._selected_device = device.name
elif isinstance(device, str):
self._selected_device = device
else:
raise ValueError("Device must be a string or a device object")
# reset the namespace
self._update_dynamic_namespace({})
self._server_registry = {}
self._top_level = {}
self._ipython_registry = {}
# Register the new callback
def _start_update_script(self) -> None:
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
from_start=True,
self._target_endpoint, cb=self._handle_msg_update, parent=self
)
@property
def windows(self) -> dict:
"""Dictionary with dock areas in the GUI."""
return {widget.object_name: widget for widget in self._top_level.values()}
@staticmethod
def _handle_msg_update(msg: MessageObject, parent: BECGuiClientMixin) -> None:
if parent.auto_updates is not None:
# pylint: disable=protected-access
parent._update_script_msg_parser(msg.value)
@property
def window_list(self) -> list:
"""List with dock areas in the GUI."""
return list(self._top_level.values())
def _update_script_msg_parser(self, msg: messages.BECMessage) -> None:
if isinstance(msg, messages.ScanStatusMessage):
if not self.gui_is_alive():
return
self.auto_updates.msg_queue.put(msg)
def start(self, wait: bool = False) -> None:
"""Start the GUI server."""
return self._start(wait=wait)
def show(self):
"""Show the GUI window."""
if self._check_if_server_is_alive():
return self._show_all()
return self.start(wait=True)
def hide(self):
"""Hide the GUI window."""
return self._hide_all()
def new(
self,
name: str | None = None,
wait: bool = True,
geometry: tuple[int, int, int, int] | None = None,
launch_script: str = "dock_area",
**kwargs,
) -> client.BECDockArea:
"""Create a new top-level dock area.
Args:
name(str, optional): The name of the dock area. Defaults to None.
wait(bool, optional): Whether to wait for the server to start. Defaults to True.
geometry(tuple[int, int, int, int] | None): The geometry of the dock area (pos_x, pos_y, w, h)
Returns:
client.BECDockArea: The new dock area.
def show(self) -> None:
"""
if not self._check_if_server_is_alive():
self.start(wait=True)
if wait:
with wait_for_server(self):
widget = self.launcher._run_rpc(
"launch", launch_script=launch_script, name=name, geometry=geometry, **kwargs
) # pylint: disable=protected-access
return widget
widget = self.launcher._run_rpc(
"launch", launch_script=launch_script, name=name, geometry=geometry, **kwargs
) # pylint: disable=protected-access
return widget
def delete(self, name: str) -> None:
"""Delete a dock area.
Args:
name(str): The name of the dock area.
Show the figure.
"""
widget = self.windows.get(name)
if widget is None:
raise ValueError(f"Dock area {name} not found.")
widget._run_rpc("close") # pylint: disable=protected-access
def delete_all(self) -> None:
"""Delete all dock areas."""
for widget_name in self.windows:
self.delete(widget_name)
def kill_server(self) -> None:
"""Kill the GUI server."""
# Unregister the registry state
self._killed = True
if self._gui_started_timer is not None:
self._gui_started_timer.cancel()
self._gui_started_timer.join()
if self._process is None or self._process.poll() is not None:
self._start_update_script()
self._process, self._process_output_processing_thread = _start_plot_process(
self._gui_id, self.__class__, self._client._service_config.config, logger=logger
)
while not self.gui_is_alive():
print("Waiting for GUI to start...")
time.sleep(1)
logger.success(f"GUI started with id: {self._gui_id}")
def close(self) -> None:
"""
Close the gui window.
"""
if self._process is None:
return
self._client.shutdown()
if self._process:
logger.success("Stopping GUI...")
self._process.terminate()
if self._process_output_processing_thread:
self._process_output_processing_thread.join()
self._process.wait()
self._process = None
if self.auto_updates is not None:
self.auto_updates.shutdown()
# Unregister the registry state
self._client.connector.unregister(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
class RPCResponseTimeoutError(Exception):
"""Exception raised when an RPC response is not received within the expected time."""
def __init__(self, request_id, timeout):
super().__init__(
f"RPC response not received within {timeout} seconds for request ID {request_id}"
)
# Remove all reference from top level
self._top_level.clear()
self._server_registry.clear()
def close(self):
"""Deprecated. Use kill_server() instead."""
# FIXME, deprecated in favor of kill, will be removed in the future
self.kill_server()
#########################
#### Private methods ####
#########################
class RPCBase:
def __init__(self, gui_id: str = None, config: dict = None, parent=None) -> None:
self._client = BECClient() # BECClient is a singleton; here, we simply get the instance
self._config = config if config is not None else {}
self._gui_id = gui_id if gui_id is not None else str(uuid.uuid4())[:5]
self._parent = parent
self._msg_wait_event = threading.Event()
self._rpc_response = None
super().__init__()
# print(f"RPCBase: {self._gui_id}")
def _check_if_server_is_alive(self):
"""Checks if the process is alive"""
if self._process is None:
return False
if self._process.poll() is not None:
return False
return True
def __repr__(self):
type_ = type(self)
qualname = type_.__qualname__
return f"<{qualname} object at {hex(id(self))}>"
def _gui_post_startup(self):
timeout = 60
# Wait for 'bec' gui to be registered, this may take some time
# After 60s timeout. Should this raise an exception on timeout?
while time.time() < time.time() + timeout:
if len(list(self._server_registry.keys())) < 2 or not hasattr(
self, self._anchor_widget
):
time.sleep(0.1)
else:
break
self._gui_started_event.set()
def _start_server(self, wait: bool = False) -> None:
@property
def _root(self):
"""
Start the GUI server, and execute callback when it is launched
Get the root widget. This is the BECFigure widget that holds
the anchor gui_id.
"""
if self._process is None or self._process.poll() is not None:
logger.success("GUI starting...")
self._startup_timeout = 5
self._gui_started_event.clear()
self._process, self._process_output_processing_thread = _start_plot_process(
self._gui_id,
gui_class_id="bec",
config=self._client._service_config.config, # pylint: disable=protected-access
logger=logger,
parent = self
# pylint: disable=protected-access
while parent._parent is not None:
parent = parent._parent
return parent
def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=3, **kwargs):
"""
Run the RPC call.
Args:
method: The method to call.
args: The arguments to pass to the method.
wait_for_rpc_response: Whether to wait for the RPC response.
kwargs: The keyword arguments to pass to the method.
Returns:
The result of the RPC call.
"""
request_id = str(uuid.uuid4())
rpc_msg = messages.GUIInstructionMessage(
action=method,
parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id},
metadata={"request_id": request_id},
)
# pylint: disable=protected-access
receiver = self._root._gui_id
if wait_for_rpc_response:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
)
def gui_started_callback(callback):
try:
if callable(callback):
callback()
finally:
threading.current_thread().cancel() # type: ignore
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
self._gui_started_timer = RepeatTimer(
0.5, lambda: self._gui_is_alive() and gui_started_callback(self._gui_post_startup)
)
self._gui_started_timer.start()
if wait:
self._gui_started_event.wait()
def _start(self, wait: bool = False) -> None:
self._killed = False
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
)
return self._start_server(wait=wait)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(10)
if not finished:
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
self._client.connector.unregister(
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
)
# get class name
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
@staticmethod
def _handle_registry_update(
msg: dict[str, GUIRegistryStateMessage], parent: BECGuiClient
) -> None:
# This was causing a deadlock during shutdown, not sure why.
# with self._lock:
self = parent
self._server_registry = cast(dict[str, RegistryState], msg["data"].state)
self._update_dynamic_namespace(self._server_registry)
def _on_rpc_response(msg: MessageObject, parent: RPCBase) -> None:
msg = msg.value
parent._msg_wait_event.set()
parent._rpc_response = msg
def _do_show_all(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self)
rpc_client._run_rpc("show") # pylint: disable=protected-access
for window in self._top_level.values():
window.show()
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
return None
if isinstance(msg_result, list):
return [self._create_widget_from_msg_result(res) for res in msg_result]
if isinstance(msg_result, dict):
if "__rpc__" not in msg_result:
return {
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
}
cls = msg_result.pop("widget_class", None)
msg_result.pop("__rpc__", None)
def _show_all(self):
with wait_for_server(self):
return self._do_show_all()
if not cls:
return msg_result
def _hide_all(self):
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self)
rpc_client._run_rpc("hide") # pylint: disable=protected-access
if not self._killed:
for window in self._top_level.values():
window.hide()
cls = getattr(client, cls)
# print(msg_result)
return cls(parent=self, **msg_result)
return msg_result
def _update_dynamic_namespace(self, server_registry: dict):
def gui_is_alive(self):
"""
Update the dynamic name space with the given server registry.
Setting the server registry to an empty dictionary will remove all widgets from the namespace.
Args:
server_registry (dict): The server registry
Check if the GUI is alive.
"""
top_level_widgets: dict[str, RPCReference] = {}
for gui_id, state in server_registry.items():
widget = self._add_widget(state, self)
if widget is None:
# ignore widgets that are not supported
continue
# get all top-level widgets. These are widgets that have no parent
if not state["config"].get("parent_id"):
top_level_widgets[gui_id] = widget
remove_from_registry = []
for gui_id, widget in self._ipython_registry.items():
if gui_id not in server_registry:
remove_from_registry.append(gui_id)
for gui_id in remove_from_registry:
self._ipython_registry.pop(gui_id)
removed_widgets = [
widget.object_name for widget in self._top_level.values() if widget._is_deleted()
]
for widget_name in removed_widgets:
# the check is not strictly necessary, but better safe
# than sorry; who knows what the user has done
if hasattr(self, widget_name):
delattr(self, widget_name)
for gui_id, widget_ref in top_level_widgets.items():
setattr(self, widget_ref.object_name, widget_ref)
self._top_level = top_level_widgets
for widget in self._ipython_registry.values():
widget._refresh_references()
def _add_widget(self, state: dict, parent: object) -> RPCReference | None:
"""Add a widget to the namespace
Args:
state (dict): The state of the widget from the _server_registry.
parent (object): The parent object.
"""
object_name = state["object_name"]
gui_id = state["gui_id"]
if state["widget_class"] in IGNORE_WIDGETS:
return
widget_class = getattr(client, state["widget_class"], None)
if widget_class is None:
return
obj = self._ipython_registry.get(gui_id)
if obj is None:
widget = widget_class(gui_id=gui_id, object_name=object_name, parent=parent)
self._ipython_registry[gui_id] = widget
else:
widget = obj
obj = RPCReference(registry=self._ipython_registry, gui_id=gui_id)
return obj
if __name__ == "__main__": # pragma: no cover
from bec_lib.client import BECClient
from bec_lib.service_config import ServiceConfig
try:
config = ServiceConfig()
bec_client = BECClient(config)
bec_client.start()
# Test the client_utils.py module
gui = BECGuiClient()
gui.start(wait=True)
gui.new().new(widget="Waveform")
time.sleep(10)
finally:
gui.kill_server()
heart = self._client.connector.get(MessageEndpoints.gui_heartbeat(self._root._gui_id))
if heart is None:
return False
if heart.status == messages.BECStatus.RUNNING:
return True
return False

View File

@@ -2,21 +2,16 @@
from __future__ import annotations
import argparse
import importlib
import inspect
import os
import sys
from pathlib import Path
import black
import isort
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property as QtProperty
from bec_widgets.utils.generate_designer_plugin import DesignerPluginGenerator, plugin_filenames
from bec_widgets.utils.plugin_utils import BECClassContainer, get_custom_classes
logger = bec_logger.logger
from bec_widgets.utils.generate_designer_plugin import DesignerPluginGenerator
from bec_widgets.utils.plugin_utils import BECClassContainer, get_rpc_classes
if sys.version_info >= (3, 11):
from typing import get_overloads
@@ -34,29 +29,13 @@ else:
class ClientGenerator:
def __init__(self, base=False):
self._base = base
base_imports = (
"""import enum
import inspect
import traceback
from functools import reduce
from operator import add
from typing import Literal, Optional
"""
if self._base
else "\n"
)
self.header = f"""# This file was automatically generated by generate_cli.py
# type: ignore \n
def __init__(self):
self.header = """# This file was automatically generated by generate_cli.py\n
from __future__ import annotations
{base_imports}
from bec_lib.logger import bec_logger
import enum
from typing import Literal, Optional, overload
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
{"from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets, get_plugin_client_module" if self._base else ""}
logger = bec_logger.logger
from bec_widgets.cli.client_utils import RPCBase, rpc_call, BECGuiClientMixin
# pylint: skip-file"""
@@ -64,26 +43,18 @@ logger = bec_logger.logger
def generate_client(self, class_container: BECClassContainer):
"""
Generate the client for the published classes, skipping any classes
that have `RPC = False`.
Generate the client for the published classes.
Args:
class_container: The class container with the classes to generate the client for.
"""
# Filter out classes that explicitly have RPC=False
rpc_top_level_classes = [
cls for cls in class_container.rpc_top_level_classes if getattr(cls, "RPC", True)
]
rpc_top_level_classes = class_container.rpc_top_level_classes
rpc_top_level_classes.sort(key=lambda x: x.__name__)
connector_classes = [
cls for cls in class_container.connector_classes if getattr(cls, "RPC", True)
]
connector_classes = class_container.connector_classes
connector_classes.sort(key=lambda x: x.__name__)
self.write_client_enum(rpc_top_level_classes)
for cls in connector_classes:
logger.debug(f"generating RPC client class for {cls.__name__}")
self.content += "\n\n"
self.generate_content_for_class(cls)
@@ -91,50 +62,14 @@ logger = bec_logger.logger
"""
Write the client enum to the content.
"""
if self._base:
self.content += """
class _WidgetsEnumType(str, enum.Enum):
\"\"\" Enum for the available widgets, to be generated programatically \"\"\"
...
"""
self.content += """
_Widgets = {
class Widgets(str, enum.Enum):
\"\"\"
Enum for the available widgets.
\"\"\"
"""
for cls in published_classes:
self.content += f'"{cls.__name__}": "{cls.__name__}",\n '
self.content += """}
"""
if self._base:
self.content += """
try:
_plugin_widgets = get_all_plugin_widgets()
plugin_client = get_plugin_client_module()
Widgets = _WidgetsEnumType("Widgets", {name: name for name in _plugin_widgets} | _Widgets)
if (_overlap := _Widgets.keys() & _plugin_widgets.keys()) != set():
for _widget in _overlap:
logger.warning(f"Detected duplicate widget {_widget} in plugin repo file: {inspect.getfile(_plugin_widgets[_widget])} !")
for plugin_name, plugin_class in inspect.getmembers(plugin_client, inspect.isclass):
if issubclass(plugin_class, RPCBase) and plugin_class is not RPCBase:
if plugin_name in globals():
conflicting_file = (
inspect.getfile(_plugin_widgets[plugin_name])
if plugin_name in _plugin_widgets
else f"{plugin_client}"
)
logger.warning(
f"Plugin widget {plugin_name} from {conflicting_file} conflicts with a built-in class!"
)
continue
if plugin_name not in _overlap:
globals()[plugin_name] = plugin_class
except ImportError as e:
logger.error(f"Failed loading plugins: \\n{reduce(add, traceback.format_exception(e))}")
"""
self.content += f'{cls.__name__} = "{cls.__name__}"\n '
def generate_content_for_class(self, cls):
"""
@@ -146,28 +81,16 @@ except ImportError as e:
class_name = cls.__name__
if class_name == "BECDockArea":
# Generate the content
if cls.__name__ == "BECDockArea":
self.content += f"""
class {class_name}(RPCBase):"""
class {class_name}(RPCBase, BECGuiClientMixin):"""
else:
self.content += f"""
class {class_name}(RPCBase):"""
if cls.__doc__:
# We only want the first line of the docstring
# But skip the first line if it's a blank line
first_line = cls.__doc__.split("\n")[0]
if first_line:
class_docs = first_line
else:
class_docs = cls.__doc__.split("\n")[1]
self.content += f"""
\"\"\"{class_docs}\"\"\"
"""
if not cls.USER_ACCESS:
self.content += """...
"""
for method in cls.USER_ACCESS:
is_property_setter = False
obj = getattr(cls, method, None)
@@ -177,10 +100,8 @@ class {class_name}(RPCBase):"""
method = method.split(".setter")[0]
if obj is None:
raise AttributeError(
f"Method {method} not found in class {cls.__name__}. "
f"Please check the USER_ACCESS list."
f"Method {method} not found in class {cls.__name__}. Please check the USER_ACCESS list."
)
if isinstance(obj, (property, QtProperty)):
# for the cli, we can map qt properties to regular properties
if is_property_setter:
@@ -223,18 +144,18 @@ class {class_name}(RPCBase):"""
# Combine header and content, then format with black
full_content = self.header + "\n" + self.content
try:
formatted_content = black.format_str(full_content, mode=black.Mode(line_length=100))
formatted_content = black.format_str(full_content, mode=black.FileMode(line_length=100))
except black.NothingChanged:
formatted_content = full_content
config = isort.Config(
isort.Config(
profile="black",
line_length=100,
multi_line_output=3,
include_trailing_comma=False,
include_trailing_comma=True,
known_first_party=["bec_widgets"],
)
formatted_content = isort.code(formatted_content, config=config)
formatted_content = isort.code(formatted_content)
with open(file_name, "w", encoding="utf-8") as file:
file.write(formatted_content)
@@ -246,78 +167,41 @@ def main():
"""
parser = argparse.ArgumentParser(description="Auto-generate the client for RPC widgets")
parser.add_argument(
"--target",
action="store",
type=str,
help="Which package to generate plugin files for. Should be installed in the local environment (example: my_plugin_repo)",
)
parser.add_argument("--core", action="store_true", help="Whether to generate the core client")
args = parser.parse_args()
if args.target is None:
logger.error(
"You must provide a target - for safety, the default of running this on bec_widgets core has been removed. To generate the client for bec_widgets, run `bw-generate-cli --target bec_widgets`"
)
return
logger.info(f"BEC Widget code generation tool started with args: {args}")
if args.core:
current_path = os.path.dirname(__file__)
client_path = os.path.join(current_path, "client.py")
client_subdir = "cli" if args.target == "bec_widgets" else "widgets"
module_name = "bec_widgets" if args.target == "bec_widgets" else f"{args.target}.bec_widgets"
rpc_classes = get_rpc_classes("bec_widgets")
try:
module = importlib.import_module(module_name)
assert module.__file__ is not None
module_file = Path(module.__file__)
module_dir = module_file.parent if module_file.is_file() else module_file
except Exception as e:
logger.error(f"Failed to load module {module_name} for code generation: {e}")
return
generator = ClientGenerator()
generator.generate_client(rpc_classes)
generator.write(client_path)
client_path = module_dir / client_subdir / "client.py"
for cls in rpc_classes.plugins:
plugin = DesignerPluginGenerator(cls)
if not hasattr(plugin, "info"):
continue
rpc_classes = get_custom_classes(module_name)
logger.info(f"Obtained classes with RPC objects: {rpc_classes!r}")
generator = ClientGenerator(base=module_name == "bec_widgets")
logger.info(f"Generating client file at {client_path}")
generator.generate_client(rpc_classes)
generator.write(str(client_path))
if module_name != "bec_widgets":
non_overwrite_classes = list(clsinfo.name for clsinfo in get_custom_classes("bec_widgets"))
logger.info(
f"Not writing plugins which would conflict with builtin classes: {non_overwrite_classes}"
)
else:
non_overwrite_classes = []
for cls in rpc_classes.plugins:
logger.info(f"Writing bec-designer plugin files for {cls.__name__}...")
if cls.__name__ in non_overwrite_classes:
logger.error(
f"Not writing plugin files for {cls.__name__} because a built-in widget with that name exists"
)
plugin = DesignerPluginGenerator(cls)
if not hasattr(plugin, "info"):
continue
def _exists(file: str):
return os.path.exists(os.path.join(plugin.info.base_path, file))
if any(_exists(file) for file in plugin_filenames(plugin.info.plugin_name_snake)):
logger.debug(
f"Skipping generation of extra plugin files for {plugin.info.plugin_name_snake} - at least one file out of 'plugin.py', 'pyproject', and 'register_{plugin.info.plugin_name_snake}.py' already exists."
)
continue
plugin.run()
# if the class directory already has a register, plugin and pyproject file, skip
if os.path.exists(
os.path.join(plugin.info.base_path, f"register_{plugin.info.plugin_name_snake}.py")
):
continue
if os.path.exists(
os.path.join(plugin.info.base_path, f"{plugin.info.plugin_name_snake}_plugin.py")
):
continue
if os.path.exists(
os.path.join(plugin.info.base_path, f"{plugin.info.plugin_name_snake}.pyproject")
):
continue
plugin.run()
if __name__ == "__main__": # pragma: no cover
import sys
sys.argv = ["bw-generate-cli", "--target", "bec_widgets"]
sys.argv = ["generate_cli.py", "--core"]
main()

View File

@@ -1,308 +0,0 @@
from __future__ import annotations
import inspect
import threading
import uuid
from functools import wraps
from typing import TYPE_CHECKING, Any, cast
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
if TYPE_CHECKING: # pragma: no cover
from bec_lib import messages
from bec_lib.connector import MessageObject
import bec_widgets.cli.client as client
from bec_widgets.cli.client_utils import BECGuiClient
else:
client = lazy_import("bec_widgets.cli.client") # avoid circular import
messages = lazy_import("bec_lib.messages")
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
# pylint: disable=protected-access
def rpc_call(func):
"""
A decorator for calling a function on the server.
Args:
func: The function to call.
Returns:
The result of the function call.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
# we could rely on a strict type check here, but this is more flexible
# moreover, it would anyway crash for objects...
caller_frame = inspect.currentframe().f_back # type: ignore
while caller_frame:
if "jedi" in caller_frame.f_globals:
# Jedi module is present, likely tab completion
# Do not run the RPC call
return None # func(*args, **kwargs)
caller_frame = caller_frame.f_back
out = []
for arg in args:
if hasattr(arg, "name"):
arg = arg.name
out.append(arg)
args = tuple(out)
for key, val in kwargs.items():
if hasattr(val, "name"):
kwargs[key] = val.name
if not self._root._gui_is_alive():
raise RuntimeError("GUI is not alive")
return self._run_rpc(func.__name__, *args, **kwargs)
return wrapper
class RPCResponseTimeoutError(Exception):
"""Exception raised when an RPC response is not received within the expected time."""
def __init__(self, request_id, timeout):
super().__init__(
f"RPC response not received within {timeout} seconds for request ID {request_id}"
)
class DeletedWidgetError(Exception): ...
def check_for_deleted_widget(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if self._gui_id not in self._registry:
raise DeletedWidgetError(f"Widget with gui_id {self._gui_id} has been deleted")
return func(self, *args, **kwargs)
return wrapper
class RPCReference:
def __init__(self, registry: dict, gui_id: str) -> None:
self._registry = registry
self._gui_id = gui_id
self.object_name = self._registry[self._gui_id].object_name
@check_for_deleted_widget
def __getattr__(self, name):
if name in ["_registry", "_gui_id", "_is_deleted", "object_name"]:
return super().__getattribute__(name)
return self._registry[self._gui_id].__getattribute__(name)
def __setattr__(self, name, value):
if name in ["_registry", "_gui_id", "_is_deleted", "object_name"]:
return super().__setattr__(name, value)
if self._gui_id not in self._registry:
raise DeletedWidgetError(f"Widget with gui_id {self._gui_id} has been deleted")
self._registry[self._gui_id].__setattr__(name, value)
def __repr__(self):
if self._gui_id not in self._registry:
return f"<Deleted widget with gui_id {self._gui_id}>"
return self._registry[self._gui_id].__repr__()
def __str__(self):
if self._gui_id not in self._registry:
return f"<Deleted widget with gui_id {self._gui_id}>"
return self._registry[self._gui_id].__str__()
def __dir__(self):
if self._gui_id not in self._registry:
return []
return self._registry[self._gui_id].__dir__()
def _is_deleted(self) -> bool:
return self._gui_id not in self._registry
class RPCBase:
def __init__(
self,
gui_id: str | None = None,
config: dict | None = None,
object_name: str | None = None,
parent=None,
**kwargs,
) -> None:
self._client = BECClient() # BECClient is a singleton; here, we simply get the instance
self._config = config if config is not None else {}
self._gui_id = gui_id if gui_id is not None else str(uuid.uuid4())[:5]
self.object_name = object_name if object_name is not None else str(uuid.uuid4())[:5]
self._parent = parent
self._msg_wait_event = threading.Event()
self._rpc_response = None
super().__init__()
self._rpc_references: dict[str, str] = {}
def __repr__(self):
type_ = type(self)
qualname = type_.__qualname__
return f"<{qualname} with name: {self.object_name}>"
def remove(self):
"""
Remove the widget.
"""
obj = self._root._server_registry.get(self._gui_id)
if obj is None:
raise ValueError(f"Widget {self._gui_id} not found.")
if proxy := obj.get("container_proxy"):
assert isinstance(proxy, str)
self._run_rpc("remove", gui_id=proxy)
return
self._run_rpc("remove")
@property
def _root(self) -> BECGuiClient:
"""
Get the root widget. This is the BECFigure widget that holds
the anchor gui_id.
"""
parent = self
# pylint: disable=protected-access
while parent._parent is not None:
parent = parent._parent
return parent # type: ignore
def _run_rpc(
self,
method,
*args,
wait_for_rpc_response=True,
timeout=5,
gui_id: str | None = None,
**kwargs,
) -> Any:
"""
Run the RPC call.
Args:
method: The method to call.
args: The arguments to pass to the method.
wait_for_rpc_response: Whether to wait for the RPC response.
timeout: The timeout for the RPC response.
gui_id: The GUI ID to use for the RPC call. If None, the default GUI ID is used.
kwargs: The keyword arguments to pass to the method.
Returns:
The result of the RPC call.
"""
request_id = str(uuid.uuid4())
rpc_msg = messages.GUIInstructionMessage(
action=method,
parameter={"args": args, "kwargs": kwargs, "gui_id": gui_id or self._gui_id},
metadata={"request_id": request_id},
)
# pylint: disable=protected-access
receiver = self._root._gui_id
if wait_for_rpc_response:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(timeout)
if not finished:
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
self._client.connector.unregister(
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
)
# we can assume that the response is a RequestResponseMessage, updated by
# the _on_rpc_response method
assert isinstance(self._rpc_response, messages.RequestResponseMessage)
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
@staticmethod
def _on_rpc_response(msg_obj: MessageObject, parent: RPCBase) -> None:
msg = cast(messages.RequestResponseMessage, msg_obj.value)
parent._rpc_response = msg
parent._msg_wait_event.set()
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
return None
if isinstance(msg_result, list):
return [self._create_widget_from_msg_result(res) for res in msg_result]
if isinstance(msg_result, dict):
if "__rpc__" not in msg_result:
return {
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
}
cls = msg_result.pop("widget_class", None)
msg_result.pop("__rpc__", None)
if not cls:
return msg_result
cls = getattr(client, cls)
# The namespace of the object will be updated dynamically on the client side
# Therefore it is important to check if the object is already in the registry
# If yes, we return the reference to the object, otherwise we create a new object
# pylint: disable=protected-access
if msg_result["gui_id"] in self._root._ipython_registry:
return RPCReference(self._root._ipython_registry, msg_result["gui_id"])
ret = cls(parent=self, **msg_result)
self._root._ipython_registry[ret._gui_id] = ret
self._refresh_references()
obj = RPCReference(self._root._ipython_registry, ret._gui_id)
return obj
# return ret
return msg_result
def _gui_is_alive(self):
"""
Check if the GUI is alive.
"""
heart = self._client.connector.get(MessageEndpoints.gui_heartbeat(self._root._gui_id))
if heart is None:
return False
if heart.status == messages.BECStatus.RUNNING:
return True
return False
def _refresh_references(self):
"""
Refresh the references.
"""
with self._root._lock:
references = {}
for key, val in self._root._server_registry.items():
parent_id = val["config"].get("parent_id")
if parent_id == self._gui_id:
references[key] = {
"gui_id": val["config"]["gui_id"],
"object_name": val["object_name"],
}
removed_references = set(self._rpc_references.keys()) - set(references.keys())
for key in removed_references:
delattr(self, self._rpc_references[key]["object_name"])
self._rpc_references = references
for key, val in references.items():
setattr(
self,
val["object_name"],
RPCReference(self._root._ipython_registry, val["gui_id"]),
)

View File

@@ -1,189 +0,0 @@
from __future__ import annotations
from functools import wraps
from threading import RLock
from typing import TYPE_CHECKING, Callable
from weakref import WeakValueDictionary
from bec_lib.logger import bec_logger
from qtpy.QtCore import QObject
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.widgets.containers.dock.dock import BECDock
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
logger = bec_logger.logger
def broadcast_update(func):
"""
Decorator to broadcast updates to the RPCRegister whenever a new RPC object is added or removed.
If class attribute _skip_broadcast is set to True, the broadcast will be skipped
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs)
self.broadcast()
return result
return wrapper
class RPCRegister:
"""
A singleton class that keeps track of all the RPC objects registered in the system for CLI usage.
"""
_instance = None
_initialized = False
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(RPCRegister, cls).__new__(cls)
cls._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._rpc_register = WeakValueDictionary()
self._broadcast_on_hold = RPCRegisterBroadcast(self)
self._lock = RLock()
self._skip_broadcast = False
self._initialized = True
self.callbacks = []
@classmethod
def delayed_broadcast(cls):
"""
Delay the broadcast of the update to all the callbacks.
"""
register = cls()
return register._broadcast_on_hold
@broadcast_update
def add_rpc(self, rpc: BECConnector):
"""
Add an RPC object to the register.
Args:
rpc(QObject): The RPC object to be added to the register.
"""
if not hasattr(rpc, "gui_id"):
raise ValueError("RPC object must have a 'gui_id' attribute.")
self._rpc_register[rpc.gui_id] = rpc
@broadcast_update
def remove_rpc(self, rpc: BECConnector):
"""
Remove an RPC object from the register.
Args:
rpc(str): The RPC object to be removed from the register.
"""
if not hasattr(rpc, "gui_id"):
raise ValueError(f"RPC object {rpc} must have a 'gui_id' attribute.")
self._rpc_register.pop(rpc.gui_id, None)
def get_rpc_by_id(self, gui_id: str) -> QObject | None:
"""
Get an RPC object by its ID.
Args:
gui_id(str): The ID of the RPC object to be retrieved.
Returns:
QObject | None: The RPC object with the given ID or None
"""
rpc_object = self._rpc_register.get(gui_id, None)
return rpc_object
def list_all_connections(self) -> dict:
"""
List all the registered RPC objects.
Returns:
dict: A dictionary containing all the registered RPC objects.
"""
with self._lock:
connections = dict(self._rpc_register)
return connections
def get_names_of_rpc_by_class_type(
self, cls: type[BECWidget] | type[BECConnector] | type[BECDock] | type[BECDockArea]
) -> list[str]:
"""Get all the names of the widgets.
Args:
cls(BECWidget | BECConnector): The class of the RPC object to be retrieved.
"""
# This retrieves any rpc objects that are subclass of BECWidget,
# i.e. curve and image items are excluded
widgets = [rpc for rpc in self._rpc_register.values() if isinstance(rpc, cls)]
return [widget.object_name for widget in widgets]
def broadcast(self):
"""
Broadcast the update to all the callbacks.
"""
if self._skip_broadcast:
return
connections = self.list_all_connections()
for callback in self.callbacks:
callback(connections)
def object_is_registered(self, obj: BECConnector) -> bool:
"""
Check if an object is registered in the RPC register.
Args:
obj(QObject): The object to check.
Returns:
bool: True if the object is registered, False otherwise.
"""
return obj.gui_id in self._rpc_register
def add_callback(self, callback: Callable[[dict], None]):
"""
Add a callback that will be called whenever the registry is updated.
Args:
callback(Callable[[dict], None]): The callback to be added. It should accept a dictionary of all the
registered RPC objects as an argument.
"""
self.callbacks.append(callback)
@classmethod
def reset_singleton(cls):
"""
Reset the singleton instance.
"""
cls._instance = None
cls._initialized = False
class RPCRegisterBroadcast:
"""Context manager for RPCRegister broadcast."""
def __init__(self, rpc_register: RPCRegister) -> None:
self.rpc_register = rpc_register
self._call_depth = 0
def __enter__(self):
"""Enter the context manager"""
self._call_depth += 1 # Needed for nested calls
self.rpc_register._skip_broadcast = True
return self.rpc_register
def __exit__(self, *exc):
"""Exit the context manager"""
self._call_depth -= 1 # Remove nested calls
if self._call_depth == 0: # The Last one to exit is responsible for broadcasting
self.rpc_register._skip_broadcast = False
self.rpc_register.broadcast()

View File

@@ -0,0 +1,80 @@
from threading import Lock
from weakref import WeakValueDictionary
from qtpy.QtCore import QObject
class RPCRegister:
"""
A singleton class that keeps track of all the RPC objects registered in the system for CLI usage.
"""
_instance = None
_initialized = False
_lock = Lock()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(RPCRegister, cls).__new__(cls)
cls._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._rpc_register = WeakValueDictionary()
self._initialized = True
def add_rpc(self, rpc: QObject):
"""
Add an RPC object to the register.
Args:
rpc(QObject): The RPC object to be added to the register.
"""
if not hasattr(rpc, "gui_id"):
raise ValueError("RPC object must have a 'gui_id' attribute.")
self._rpc_register[rpc.gui_id] = rpc
def remove_rpc(self, rpc: str):
"""
Remove an RPC object from the register.
Args:
rpc(str): The RPC object to be removed from the register.
"""
if not hasattr(rpc, "gui_id"):
raise ValueError(f"RPC object {rpc} must have a 'gui_id' attribute.")
self._rpc_register.pop(rpc.gui_id, None)
def get_rpc_by_id(self, gui_id: str) -> QObject:
"""
Get an RPC object by its ID.
Args:
gui_id(str): The ID of the RPC object to be retrieved.
Returns:
QObject: The RPC object with the given ID.
"""
rpc_object = self._rpc_register.get(gui_id, None)
return rpc_object
def list_all_connections(self) -> dict:
"""
List all the registered RPC objects.
Returns:
dict: A dictionary containing all the registered RPC objects.
"""
with self._lock:
connections = dict(self._rpc_register)
return connections
@classmethod
def reset_singleton(cls):
"""
Reset the singleton instance.
"""
cls._instance = None
cls._initialized = False

View File

@@ -1,9 +1,4 @@
from __future__ import annotations
from bec_widgets.cli.client_utils import IGNORE_WIDGETS
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.plugin_utils import get_custom_classes
from bec_widgets.utils import BECConnector
class RPCWidgetHandler:
@@ -13,7 +8,7 @@ class RPCWidgetHandler:
self._widget_classes = None
@property
def widget_classes(self) -> dict[str, type[BECWidget]]:
def widget_classes(self):
"""
Get the available widget classes.
@@ -22,7 +17,7 @@ class RPCWidgetHandler:
"""
if self._widget_classes is None:
self.update_available_widgets()
return self._widget_classes # type: ignore
return self._widget_classes
def update_available_widgets(self):
"""
@@ -31,24 +26,25 @@ class RPCWidgetHandler:
Returns:
None
"""
clss = get_custom_classes("bec_widgets")
self._widget_classes = get_all_plugin_widgets() | {
cls.__name__: cls for cls in clss.widgets if cls.__name__ not in IGNORE_WIDGETS
}
from bec_widgets.utils.plugin_utils import get_rpc_classes
def create_widget(self, widget_type, **kwargs) -> BECWidget:
clss = get_rpc_classes("bec_widgets")
self._widget_classes = {cls.__name__: cls for cls in clss.top_level_classes}
def create_widget(self, widget_type, **kwargs) -> BECConnector:
"""
Create a widget from an RPC message.
Args:
widget_type(str): The type of the widget.
name (str): The name of the widget.
**kwargs: The keyword arguments for the widget.
Returns:
widget(BECWidget): The created widget.
widget(BECConnector): The created widget.
"""
widget_class = self.widget_classes.get(widget_type) # type: ignore
if self._widget_classes is None:
self.update_available_widgets()
widget_class = self._widget_classes.get(widget_type)
if widget_class:
return widget_class(**kwargs)
raise ValueError(f"Unknown widget type: {widget_type}")

View File

@@ -1,27 +1,140 @@
from __future__ import annotations
import argparse
import inspect
import json
import os
import signal
import sys
from contextlib import redirect_stderr, redirect_stdout
from typing import cast
from typing import Union
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.service_config import ServiceConfig
from qtpy.QtCore import QSize, Qt
from qtpy.QtGui import QIcon
from qtpy.QtWidgets import QApplication
from bec_lib.utils.import_utils import lazy_import
from qtpy.QtCore import QTimer
import bec_widgets
from bec_widgets.applications.launch_window import LaunchWindow
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_dispatcher import QtRedisConnector
from bec_widgets.widgets.dock.dock_area import BECDockArea
from bec_widgets.widgets.figure import BECFigure
messages = lazy_import("bec_lib.messages")
logger = bec_logger.logger
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
class BECWidgetsCLIServer:
def __init__(
self,
gui_id: str,
dispatcher: BECDispatcher = None,
client=None,
config=None,
gui_class: Union[BECFigure, BECDockArea] = BECFigure,
) -> None:
self.status = messages.BECStatus.BUSY
self.dispatcher = BECDispatcher(config=config) if dispatcher is None else dispatcher
self.client = self.dispatcher.client if client is None else client
self.client.start()
self.gui_id = gui_id
self.gui = gui_class(gui_id=self.gui_id)
self.rpc_register = RPCRegister()
self.rpc_register.add_rpc(self.gui)
self.dispatcher.connect_slot(
self.on_rpc_update, MessageEndpoints.gui_instructions(self.gui_id)
)
# Setup QTimer for heartbeat
self._heartbeat_timer = QTimer()
self._heartbeat_timer.timeout.connect(self.emit_heartbeat)
self._heartbeat_timer.start(200)
self.status = messages.BECStatus.RUNNING
logger.success(f"Server started with gui_id: {self.gui_id}")
def on_rpc_update(self, msg: dict, metadata: dict):
request_id = metadata.get("request_id")
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
try:
obj = self.get_object_from_config(msg["parameter"])
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
res = self.run_rpc(obj, method, args, kwargs)
except Exception as e:
logger.error(f"Error while executing RPC instruction: {e}")
self.send_response(request_id, False, {"error": str(e)})
else:
logger.debug(f"RPC instruction executed successfully: {res}")
self.send_response(request_id, True, {"result": res})
def send_response(self, request_id: str, accepted: bool, msg: dict):
self.client.connector.set_and_publish(
MessageEndpoints.gui_instruction_response(request_id),
messages.RequestResponseMessage(accepted=accepted, message=msg),
expire=60,
)
def get_object_from_config(self, config: dict):
gui_id = config.get("gui_id")
obj = self.rpc_register.get_rpc_by_id(gui_id)
if obj is None:
raise ValueError(f"Object with gui_id {gui_id} not found")
return obj
def run_rpc(self, obj, method, args, kwargs):
logger.debug(f"Running RPC instruction: {method} with args: {args}, kwargs: {kwargs}")
method_obj = getattr(obj, method)
# check if the method accepts args and kwargs
if not callable(method_obj):
if not args:
res = method_obj
else:
setattr(obj, method, args[0])
res = None
else:
sig = inspect.signature(method_obj)
if sig.parameters:
res = method_obj(*args, **kwargs)
else:
res = method_obj()
if isinstance(res, list):
res = [self.serialize_object(obj) for obj in res]
elif isinstance(res, dict):
res = {key: self.serialize_object(val) for key, val in res.items()}
else:
res = self.serialize_object(res)
return res
def serialize_object(self, obj):
if isinstance(obj, BECConnector):
return {
"gui_id": obj.gui_id,
"widget_class": obj.__class__.__name__,
"config": obj.config.model_dump(),
"__rpc__": True,
}
return obj
def emit_heartbeat(self):
logger.trace(f"Emitting heartbeat for {self.gui_id}")
self.client.connector.set(
MessageEndpoints.gui_heartbeat(self.gui_id),
messages.StatusMessage(name=self.gui_id, status=self.status, info={}),
expire=10,
)
def shutdown(self): # TODO not sure if needed when cleanup is done at level of BECConnector
logger.info(f"Shutting down server with gui_id: {self.gui_id}")
self.status = messages.BECStatus.IDLE
self._heartbeat_timer.stop()
self.emit_heartbeat()
self.gui.close()
self.client.shutdown()
class SimpleFileLikeFromLogOutputFunc:
@@ -42,137 +155,100 @@ class SimpleFileLikeFromLogOutputFunc:
return
class GUIServer:
"""
This class is used to start the BEC GUI and is the main entry point for launching BEC Widgets in a subprocess.
"""
def _start_server(gui_id: str, gui_class: Union[BECFigure, BECDockArea], config: str | None = None):
if config:
try:
config = json.loads(config)
service_config = ServiceConfig(config=config)
except (json.JSONDecodeError, TypeError):
service_config = ServiceConfig(config_path=config)
else:
# if no config is provided, use the default config
service_config = ServiceConfig()
def __init__(self, args):
self.config = args.config
self.gui_id = args.id
self.gui_class = args.gui_class
self.gui_class_id = args.gui_class_id
self.hide = args.hide
self.app: QApplication | None = None
self.launcher_window: LaunchWindow | None = None
self.dispatcher: BECDispatcher | None = None
def start(self):
"""
Start the GUI server.
"""
bec_logger.level = bec_logger.LOGLEVEL.INFO
if self.hide:
# pylint: disable=protected-access
bec_logger._stderr_log_level = bec_logger.LOGLEVEL.ERROR
bec_logger._update_sinks()
with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)): # type: ignore
with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)): # type: ignore
self._run()
def _get_service_config(self) -> ServiceConfig:
if self.config:
try:
config = json.loads(self.config)
service_config = ServiceConfig(config=config)
except (json.JSONDecodeError, TypeError):
service_config = ServiceConfig(config_path=config)
else:
# if no config is provided, use the default config
service_config = ServiceConfig()
return service_config
def _run(self):
"""
Run the GUI server.
"""
self.app = QApplication(sys.argv)
self.app.setApplicationName("BEC")
self.app.gui_id = self.gui_id # type: ignore
self.setup_bec_icon()
service_config = self._get_service_config()
self.dispatcher = BECDispatcher(config=service_config, gui_id=self.gui_id)
# self.dispatcher.start_cli_server(gui_id=self.gui_id)
self.launcher_window = LaunchWindow(gui_id=f"{self.gui_id}:launcher")
self.launcher_window.setAttribute(Qt.WA_ShowWithoutActivating) # type: ignore
self.app.aboutToQuit.connect(self.shutdown)
self.app.setQuitOnLastWindowClosed(False)
if self.gui_class:
# If the server is started with a specific gui class, we launch it.
# This will automatically hide the launcher.
self.launcher_window.launch(self.gui_class, name=self.gui_class_id)
def sigint_handler(*args):
# display message, for people to let it terminate gracefully
print("Caught SIGINT, exiting")
# Widgets should be all closed.
with RPCRegister.delayed_broadcast():
for widget in QApplication.instance().topLevelWidgets(): # type: ignore
widget.close()
if self.app:
self.app.quit()
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
sys.exit(self.app.exec())
def setup_bec_icon(self):
"""
Set the BEC icon for the application
"""
if self.app is None:
return
icon = QIcon()
icon.addFile(
os.path.join(MODULE_PATH, "assets", "app_icons", "bec_widgets_icon.png"),
size=QSize(48, 48),
)
self.app.setWindowIcon(icon)
def shutdown(self):
"""
Shutdown the GUI server.
"""
if self.dispatcher:
self.dispatcher.stop_cli_server()
self.dispatcher.disconnect_all()
# bec_logger.configure(
# service_config.redis,
# QtRedisConnector,
# service_name="BECWidgetsCLIServer",
# service_config=service_config.service_config,
# )
server = BECWidgetsCLIServer(gui_id=gui_id, config=service_config, gui_class=gui_class)
return server
def main():
"""
Main entry point for subprocesses that start a GUI server.
"""
import argparse
import os
from qtpy.QtCore import QSize
from qtpy.QtGui import QIcon
from qtpy.QtWidgets import QApplication, QMainWindow
import bec_widgets
bec_logger.level = bec_logger.LOGLEVEL.DEBUG
if __name__ != "__main__":
# if not running as main, set the log level to critical
# pylint: disable=protected-access
bec_logger._stderr_log_level = bec_logger.LOGLEVEL.CRITICAL
parser = argparse.ArgumentParser(description="BEC Widgets CLI Server")
parser.add_argument("--id", type=str, default="test", help="The id of the server")
parser.add_argument("--id", type=str, help="The id of the server")
parser.add_argument(
"--gui_class",
type=str,
help="Name of the gui class to be rendered. Possible values: \n- BECFigure\n- BECDockArea",
)
parser.add_argument(
"--gui_class_id",
type=str,
default="bec",
help="The id of the gui class that is added to the QApplication",
)
parser.add_argument("--config", type=str, help="Config file or config string.")
parser.add_argument("--hide", action="store_true", help="Hide on startup")
args = parser.parse_args()
server = GUIServer(args)
server.start()
if args.gui_class == "BECFigure":
gui_class = BECFigure
elif args.gui_class == "BECDockArea":
gui_class = BECDockArea
else:
print(
"Please specify a valid gui_class to run. Use -h for help."
"\n Starting with default gui_class BECFigure."
)
gui_class = BECFigure
with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)):
with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)):
app = QApplication(sys.argv)
app.setApplicationName("BEC Figure")
module_path = os.path.dirname(bec_widgets.__file__)
icon = QIcon()
icon.addFile(
os.path.join(module_path, "assets", "app_icons", "bec_widgets_icon.png"),
size=QSize(48, 48),
)
app.setWindowIcon(icon)
win = QMainWindow()
win.setWindowTitle("BEC Widgets")
server = _start_server(args.id, gui_class, args.config)
gui = server.gui
win.setCentralWidget(gui)
win.resize(800, 600)
win.show()
app.aboutToQuit.connect(server.shutdown)
def sigint_handler(*args):
# display message, for people to let it terminate gracefully
print("Caught SIGINT, exiting")
app.quit()
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
sys.exit(app.exec())
if __name__ == "__main__":
# import sys
# sys.argv = ["bec_widgets", "--gui_class", "MainWindow"]
if __name__ == "__main__": # pragma: no cover
sys.argv = ["bec_widgets.cli.server", "--id", "e2860", "--gui_class", "BECDockArea"]
main()

View File

@@ -3,11 +3,12 @@ import os
import numpy as np
import pyqtgraph as pg
from bec_qthemes import material_icon
from qtpy.QtCore import QSize
from qtpy.QtGui import QIcon
from qtpy.QtWidgets import (
QApplication,
QGroupBox,
QHBoxLayout,
QPushButton,
QSplitter,
QTabWidget,
QVBoxLayout,
@@ -15,16 +16,10 @@ from qtpy.QtWidgets import (
)
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.widget_io import WidgetHierarchy as wh
from bec_widgets.widgets.containers.dock import BECDockArea
from bec_widgets.widgets.containers.layout_manager.layout_manager import LayoutManagerWidget
from bec_widgets.widgets.editors.jupyter_console.jupyter_console import BECJupyterConsole
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.motor_map.motor_map import MotorMap
from bec_widgets.widgets.plots.multi_waveform.multi_waveform import MultiWaveform
from bec_widgets.widgets.plots.plot_base import PlotBase
from bec_widgets.widgets.plots.scatter_waveform.scatter_waveform import ScatterWaveform
from bec_widgets.widgets.plots.waveform.waveform import Waveform
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.dock.dock_area import BECDockArea
from bec_widgets.widgets.figure import BECFigure
from bec_widgets.widgets.jupyter_console.jupyter_console import BECJupyterConsole
class JupyterConsoleWindow(QWidget): # pragma: no cover:
@@ -41,24 +36,26 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
{
"np": np,
"pg": pg,
"wh": wh,
"fig": self.figure,
"dock": self.dock,
# "im": self.im,
# "mi": self.mi,
# "mm": self.mm,
# "lm": self.lm,
# "btn1": self.btn1,
# "btn2": self.btn2,
# "btn3": self.btn3,
# "btn4": self.btn4,
# "btn5": self.btn5,
# "btn6": self.btn6,
# "pb": self.pb,
# "pi": self.pi,
# "wf": self.wf,
# "scatter": self.scatter,
# "scatter_mi": self.scatter,
# "mwf": self.mwf,
"w1": self.w1,
"w2": self.w2,
"w3": self.w3,
"w4": self.w4,
"w5": self.w5,
"w6": self.w6,
"w7": self.w7,
"w8": self.w8,
"w9": self.w9,
"w10": self.w10,
"d0": self.d0,
"d1": self.d1,
"d2": self.d2,
"wave": self.wf,
# "bar": self.bar,
# "cm": self.colormap,
"im": self.im,
"mm": self.mm,
}
)
@@ -77,85 +74,117 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
first_tab_layout.addWidget(self.dock)
tab_widget.addTab(first_tab, "Dock Area")
# third_tab = QWidget()
# third_tab_layout = QVBoxLayout(third_tab)
# self.lm = LayoutManagerWidget()
# third_tab_layout.addWidget(self.lm)
# tab_widget.addTab(third_tab, "Layout Manager Widget")
#
# fourth_tab = QWidget()
# fourth_tab_layout = QVBoxLayout(fourth_tab)
# self.pb = PlotBase()
# self.pi = self.pb.plot_item
# fourth_tab_layout.addWidget(self.pb)
# tab_widget.addTab(fourth_tab, "PlotBase")
#
# tab_widget.setCurrentIndex(3)
#
second_tab = QWidget()
second_tab_layout = QVBoxLayout(second_tab)
self.figure = BECFigure(parent=self, gui_id="figure")
second_tab_layout.addWidget(self.figure)
tab_widget.addTab(second_tab, "BEC Figure")
group_box = QGroupBox("Jupyter Console", splitter)
group_box_layout = QVBoxLayout(group_box)
self.console = BECJupyterConsole(inprocess=True)
group_box_layout.addWidget(self.console)
#
# # Some buttons for layout testing
# self.btn1 = QPushButton("Button 1")
# self.btn2 = QPushButton("Button 2")
# self.btn3 = QPushButton("Button 3")
# self.btn4 = QPushButton("Button 4")
# self.btn5 = QPushButton("Button 5")
# self.btn6 = QPushButton("Button 6")
#
# fifth_tab = QWidget()
# fifth_tab_layout = QVBoxLayout(fifth_tab)
# self.wf = Waveform()
# fifth_tab_layout.addWidget(self.wf)
# tab_widget.addTab(fifth_tab, "Waveform Next Gen")
# tab_widget.setCurrentIndex(4)
#
# sixth_tab = QWidget()
# sixth_tab_layout = QVBoxLayout(sixth_tab)
# self.im = Image()
# self.mi = self.im.main_image
# sixth_tab_layout.addWidget(self.im)
# tab_widget.addTab(sixth_tab, "Image Next Gen")
# tab_widget.setCurrentIndex(5)
#
# seventh_tab = QWidget()
# seventh_tab_layout = QVBoxLayout(seventh_tab)
# self.scatter = ScatterWaveform()
# self.scatter_mi = self.scatter.main_curve
# self.scatter.plot("samx", "samy", "bpm4i")
# seventh_tab_layout.addWidget(self.scatter)
# tab_widget.addTab(seventh_tab, "Scatter Waveform")
# tab_widget.setCurrentIndex(6)
#
# eighth_tab = QWidget()
# eighth_tab_layout = QVBoxLayout(eighth_tab)
# self.mm = MotorMap()
# eighth_tab_layout.addWidget(self.mm)
# tab_widget.addTab(eighth_tab, "Motor Map")
# tab_widget.setCurrentIndex(7)
#
# ninth_tab = QWidget()
# ninth_tab_layout = QVBoxLayout(ninth_tab)
# self.mwf = MultiWaveform()
# ninth_tab_layout.addWidget(self.mwf)
# tab_widget.addTab(ninth_tab, "MultiWaveform")
# tab_widget.setCurrentIndex(8)
#
# # add stuff to the new Waveform widget
# self._init_waveform()
#
# self.setWindowTitle("Jupyter Console Window")
def _init_waveform(self):
self.wf.plot(y_name="bpm4i", y_entry="bpm4i", dap="GaussianModel")
self.wf.plot(y_name="bpm3a", y_entry="bpm3a", dap="GaussianModel")
# add stuff to figure
self._init_figure()
# init dock for testing
self._init_dock()
self.setWindowTitle("Jupyter Console Window")
def _init_figure(self):
self.w1 = self.figure.plot(
x_name="samx",
y_name="bpm4i",
# title="Standard Plot with sync device, custom labels - w1",
# x_label="Motor Position",
# y_label="Intensity (A.U.)",
row=0,
col=0,
)
self.w1.set(
title="Standard Plot with sync device, custom labels - w1",
x_label="Motor Position",
y_label="Intensity (A.U.)",
)
self.w2 = self.figure.motor_map("samx", "samy", row=0, col=1)
self.w3 = self.figure.image(
"eiger", color_map="viridis", vrange=(0, 100), title="Eiger Image - w3", row=0, col=2
)
self.w4 = self.figure.plot(
x_name="samx",
y_name="samy",
z_name="bpm4i",
color_map_z="magma",
new=True,
title="2D scatter plot - w4",
row=0,
col=3,
)
self.w5 = self.figure.plot(
y_name="bpm4i",
new=True,
title="Best Effort Plot - w5",
dap="GaussianModel",
row=1,
col=0,
)
self.w6 = self.figure.plot(
x_name="timestamp", y_name="bpm4i", new=True, title="Timestamp Plot - w6", row=1, col=1
)
self.w7 = self.figure.plot(
x_name="index", y_name="bpm4i", new=True, title="Index Plot - w7", row=1, col=2
)
self.w8 = self.figure.plot(
y_name="monitor_async", new=True, title="Async Plot - Best Effort - w8", row=2, col=0
)
self.w9 = self.figure.plot(
x_name="timestamp",
y_name="monitor_async",
new=True,
title="Async Plot - timestamp - w9",
row=2,
col=1,
)
self.w10 = self.figure.plot(
x_name="index",
y_name="monitor_async",
new=True,
title="Async Plot - index - w10",
row=2,
col=2,
)
def _init_dock(self):
self.d0 = self.dock.add_dock(name="dock_0")
self.mm = self.d0.add_widget("BECMotorMapWidget")
self.mm.change_motors("samx", "samy")
self.d1 = self.dock.add_dock(name="dock_1", position="right")
self.im = self.d1.add_widget("BECImageWidget")
self.im.image("waveform", "1d")
self.d2 = self.dock.add_dock(name="dock_2", position="bottom")
self.wf = self.d2.add_widget("BECWaveformWidget", row=0, col=0)
self.wf.plot(x_name="samx", y_name="bpm3a")
self.wf.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
# self.bar = self.d2.add_widget("RingProgressBar", row=0, col=1)
# self.bar.set_diameter(200)
# self.d3 = self.dock.add_dock(name="dock_3", position="bottom")
# self.colormap = pg.GradientWidget()
# self.d3.add_widget(self.colormap, row=0, col=0)
self.dock.save_state()
def closeEvent(self, event):
"""Override to handle things when main window is closed."""
self.dock.cleanup()
self.dock.close()
self.figure.cleanup()
self.figure.close()
self.console.close()
super().closeEvent(event)
@@ -171,16 +200,16 @@ if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
app.setApplicationName("Jupyter Console")
app.setApplicationDisplayName("Jupyter Console")
icon = material_icon("terminal", color=(255, 255, 255, 255), filled=True)
apply_theme("dark")
icon = material_icon("terminal", color="#434343", filled=True)
app.setWindowIcon(icon)
bec_dispatcher = BECDispatcher(gui_id="jupyter_console")
bec_dispatcher = BECDispatcher()
client = bec_dispatcher.client
client.start()
win = JupyterConsoleWindow()
win.show()
win.resize(1500, 800)
app.aboutToQuit.connect(win.close)
sys.exit(app.exec_())

View File

@@ -6,7 +6,7 @@ from qtpy.QtGui import QAction
from qtpy.QtWidgets import QDialog, QDialogButtonBox, QVBoxLayout
from bec_widgets.examples.plugin_example_pyside.tictactoe import TicTacToe
from bec_widgets.utils.error_popups import SafeSlot as Slot
from bec_widgets.qt_utils.error_popups import SafeSlot as Slot
class TicTacToeDialog(QDialog): # pragma: no cover

View File

@@ -266,5 +266,3 @@ class CompactPopupWidget(QWidget):
# to ensure proper resources cleanup
for child in self.container.findChildren(QWidget, options=Qt.FindDirectChildrenOnly):
child.close()
super().closeEvent(event)

View File

@@ -2,93 +2,9 @@ import functools
import sys
import traceback
from bec_lib.logger import bec_logger
from qtpy.QtCore import Property, QObject, Qt, Signal, Slot
from qtpy.QtCore import QObject, Qt, Signal, Slot
from qtpy.QtWidgets import QApplication, QMessageBox, QPushButton, QVBoxLayout, QWidget
logger = bec_logger.logger
def SafeProperty(prop_type, *prop_args, popup_error: bool = False, default=None, **prop_kwargs):
"""
Decorator to create a Qt Property with safe getter and setter so that
Qt Designer won't crash if an exception occurs in either method.
Args:
prop_type: The property type (e.g., str, bool, int, custom classes, etc.)
popup_error (bool): If True, show a popup for any error; otherwise, ignore or log silently.
default: Any default/fallback value to return if the getter raises an exception.
*prop_args, **prop_kwargs: Passed along to the underlying Qt Property constructor.
Usage:
@SafeProperty(int, default=-1)
def some_value(self) -> int:
# your getter logic
return ... # if an exception is raised, returns -1
@some_value.setter
def some_value(self, val: int):
# your setter logic
...
"""
def decorator(py_getter):
"""Decorator for the user's property getter function."""
@functools.wraps(py_getter)
def safe_getter(self_):
try:
return py_getter(self_)
except Exception:
# Identify which property function triggered error
prop_name = f"{py_getter.__module__}.{py_getter.__qualname__}"
error_msg = traceback.format_exc()
if popup_error:
ErrorPopupUtility().custom_exception_hook(*sys.exc_info(), popup_error=True)
logger.error(f"SafeProperty error in GETTER of '{prop_name}':\n{error_msg}")
return default
class PropertyWrapper:
"""
Intermediate wrapper used so that the user can optionally chain .setter(...).
"""
def __init__(self, getter_func):
# We store only our safe_getter in the wrapper
self.getter_func = safe_getter
def setter(self, setter_func):
"""Wraps the user-defined setter to handle errors safely."""
@functools.wraps(setter_func)
def safe_setter(self_, value):
try:
return setter_func(self_, value)
except Exception:
prop_name = f"{setter_func.__module__}.{setter_func.__qualname__}"
error_msg = traceback.format_exc()
if popup_error:
ErrorPopupUtility().custom_exception_hook(
*sys.exc_info(), popup_error=True
)
logger.error(f"SafeProperty error in SETTER of '{prop_name}':\n{error_msg}")
return
# Return the full read/write Property
return Property(prop_type, self.getter_func, safe_setter, *prop_args, **prop_kwargs)
def __call__(self):
"""
If user never calls `.setter(...)`, produce a read-only property.
"""
return Property(prop_type, self.getter_func, None, *prop_args, **prop_kwargs)
return PropertyWrapper(py_getter)
return decorator
def SafeSlot(*slot_args, **slot_kwargs): # pylint: disable=invalid-name
"""Function with args, acting like a decorator, applying "error_managed" decorator + Qt Slot
@@ -96,55 +12,17 @@ def SafeSlot(*slot_args, **slot_kwargs): # pylint: disable=invalid-name
'popup_error' keyword argument can be passed with boolean value if a dialog should pop up,
otherwise error display is left to the original exception hook
'verify_sender' keyword argument can be passed with boolean value if the sender should be verified
before executing the slot. If True, the slot will only execute if the sender is a QObject. This is
useful to prevent function calls from already deleted objects.
'raise_error' keyword argument can be passed with boolean value if the error should be raised
after the error is displayed. This is useful to propagate the error to the caller but should be used
with great care to avoid segfaults.
The keywords above are stored in a container which can be overridden by passing
'_override_slot_params' keyword argument with a dictionary containing the keywords to override.
This is useful to override the default behavior of the decorator for a specific function call.
"""
_slot_params = {
"popup_error": bool(slot_kwargs.pop("popup_error", False)),
"verify_sender": bool(slot_kwargs.pop("verify_sender", False)),
"raise_error": bool(slot_kwargs.pop("raise_error", False)),
}
popup_error = bool(slot_kwargs.pop("popup_error", False))
def error_managed(method):
@Slot(*slot_args, **slot_kwargs)
@functools.wraps(method)
def wrapper(*args, **kwargs):
_override_slot_params = kwargs.pop("_override_slot_params", {})
_slot_params.update(_override_slot_params)
try:
if not _slot_params["verify_sender"] or len(args) == 0:
return method(*args, **kwargs)
_instance = args[0]
if not isinstance(_instance, QObject):
return method(*args, **kwargs)
sender = _instance.sender()
if sender is None:
logger.info(
f"Sender is None for {method.__module__}.{method.__qualname__}, "
"skipping method call."
)
return
return method(*args, **kwargs)
except Exception:
slot_name = f"{method.__module__}.{method.__qualname__}"
error_msg = traceback.format_exc()
if _slot_params["popup_error"]:
ErrorPopupUtility().custom_exception_hook(*sys.exc_info(), popup_error=True)
logger.error(f"SafeSlot error in slot '{slot_name}':\n{error_msg}")
if _slot_params["raise_error"]:
raise
ErrorPopupUtility().custom_exception_hook(*sys.exc_info(), popup_error=popup_error)
return wrapper
@@ -213,12 +91,6 @@ class _ErrorPopupUtility(QObject):
msg.setMinimumHeight(400)
msg.exec_()
def show_property_error(self, title, message, widget):
"""
Show a property-specific error message.
"""
self.error_occurred.emit(title, message, widget)
def format_traceback(self, traceback_message: str) -> str:
"""
Format the traceback message to be displayed in the error popup by adding indentation to each line.
@@ -255,14 +127,12 @@ class _ErrorPopupUtility(QObject):
error_message = " ".join(captured_message)
return error_message
def get_error_message(self, exctype, value, tb):
return "".join(traceback.format_exception(exctype, value, tb))
def custom_exception_hook(self, exctype, value, tb, popup_error=False):
if popup_error or self.enable_error_popup:
error_message = traceback.format_exception(exctype, value, tb)
self.error_occurred.emit(
"Method error" if popup_error else "Application Error",
self.get_error_message(exctype, value, tb),
"".join(error_message),
self.parent(),
)
else:

View File

@@ -13,7 +13,7 @@ from qtpy.QtWidgets import (
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors, get_theme_palette
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
from bec_widgets.widgets.dark_mode_button.dark_mode_button import DarkModeButton
class PaletteViewer(BECWidget, QWidget):
@@ -22,10 +22,10 @@ class PaletteViewer(BECWidget, QWidget):
"""
ICON_NAME = "palette"
RPC = False
def __init__(self, *args, parent=None, **kwargs):
super().__init__(parent=parent, theme_update=True, **kwargs)
super().__init__(*args, theme_update=True, **kwargs)
QWidget.__init__(self, parent=parent)
self.setFixedSize(400, 600)
layout = QVBoxLayout(self)
dark_mode_button = DarkModeButton(self)

View File

@@ -1,10 +1,6 @@
from bec_lib.logger import bec_logger
from PySide6.QtGui import QCloseEvent
from qtpy.QtWidgets import QDialog, QDialogButtonBox, QHBoxLayout, QPushButton, QVBoxLayout, QWidget
from bec_widgets.utils.error_popups import SafeSlot
logger = bec_logger.logger
from bec_widgets.qt_utils.error_popups import SafeSlot as Slot
class SettingWidget(QWidget):
@@ -24,14 +20,14 @@ class SettingWidget(QWidget):
def set_target_widget(self, target_widget: QWidget):
self.target_widget = target_widget
@SafeSlot()
@Slot()
def accept_changes(self):
"""
Accepts the changes made in the settings widget and applies them to the target widget.
"""
pass
@SafeSlot(dict)
@Slot(dict)
def display_current_settings(self, config_dict: dict):
"""
Displays the current settings of the target widget in the settings widget.
@@ -41,15 +37,6 @@ class SettingWidget(QWidget):
"""
pass
def cleanup(self):
"""
Cleanup the settings widget.
"""
def closeEvent(self, event: QCloseEvent) -> None:
self.cleanup()
return super().closeEvent(event)
class SettingsDialog(QDialog):
"""
@@ -67,13 +54,12 @@ class SettingsDialog(QDialog):
settings_widget: SettingWidget = None,
window_title: str = "Settings",
config: dict = None,
modal: bool = False,
*args,
**kwargs,
):
super().__init__(parent, *args, **kwargs)
self.setModal(modal)
self.setModal(False)
self.setWindowTitle(window_title)
@@ -106,24 +92,15 @@ class SettingsDialog(QDialog):
ok_button.setDefault(True)
ok_button.setAutoDefault(True)
@SafeSlot()
@Slot()
def accept(self):
"""
Accept the changes made in the settings widget and close the dialog.
"""
self.widget.accept_changes()
self.cleanup()
super().accept()
@SafeSlot()
def reject(self):
"""
Reject the changes made in the settings widget and close the dialog.
"""
self.cleanup()
super().reject()
@SafeSlot()
@Slot()
def apply_changes(self):
"""
Apply the changes made in the settings widget without closing the dialog.
@@ -136,10 +113,7 @@ class SettingsDialog(QDialog):
"""
self.button_box.close()
self.button_box.deleteLater()
self.widget.close()
self.widget.deleteLater()
def closeEvent(self, event):
logger.info("Closing settings dialog")
self.cleanup()
super().closeEvent(event)

View File

@@ -0,0 +1,296 @@
# pylint: disable=no-name-in-module
from __future__ import annotations
import os
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import Literal
from bec_qthemes._icon.material_icons import material_icon
from qtpy.QtCore import QSize, Qt
from qtpy.QtGui import QAction, QColor, QIcon
from qtpy.QtWidgets import (
QComboBox,
QHBoxLayout,
QLabel,
QMenu,
QSizePolicy,
QToolBar,
QToolButton,
QWidget,
)
import bec_widgets
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
class ToolBarAction(ABC):
"""
Abstract base class for toolbar actions.
Args:
icon_path (str, optional): The name of the icon file from `assets/toolbar_icons`. Defaults to None.
tooltip (bool, optional): The tooltip for the action. Defaults to None.
checkable (bool, optional): Whether the action is checkable. Defaults to False.
"""
def __init__(self, icon_path: str = None, tooltip: str = None, checkable: bool = False):
self.icon_path = (
os.path.join(MODULE_PATH, "assets", "toolbar_icons", icon_path) if icon_path else None
)
self.tooltip = tooltip
self.checkable = checkable
self.action = None
@abstractmethod
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
"""Adds an action or widget to a toolbar.
Args:
toolbar (QToolBar): The toolbar to add the action or widget to.
target (QWidget): The target widget for the action.
"""
class SeparatorAction(ToolBarAction):
"""Separator action for the toolbar."""
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
toolbar.addSeparator()
class IconAction(ToolBarAction):
"""
Action with an icon for the toolbar.
Args:
icon_path (str): The path to the icon file.
tooltip (str): The tooltip for the action.
checkable (bool, optional): Whether the action is checkable. Defaults to False.
"""
def __init__(self, icon_path: str = None, tooltip: str = None, checkable: bool = False):
super().__init__(icon_path, tooltip, checkable)
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
icon = QIcon()
icon.addFile(self.icon_path, size=QSize(20, 20))
self.action = QAction(icon, self.tooltip, target)
self.action.setCheckable(self.checkable)
toolbar.addAction(self.action)
class MaterialIconAction:
"""
Action with a Material icon for the toolbar.
Args:
icon_path (str, optional): The name of the Material icon. Defaults to None.
tooltip (bool, optional): The tooltip for the action. Defaults to None.
checkable (bool, optional): Whether the action is checkable. Defaults to False.
filled (bool, optional): Whether the icon is filled. Defaults to False.
"""
def __init__(
self,
icon_name: str = None,
tooltip: str = None,
checkable: bool = False,
filled: bool = False,
color: str | tuple | QColor | dict[Literal["dark", "light"], str] | None = None,
):
self.icon_name = icon_name
self.tooltip = tooltip
self.checkable = checkable
self.action = None
self.filled = filled
self.color = color
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
icon = self.get_icon()
self.action = QAction(icon, self.tooltip, target)
self.action.setCheckable(self.checkable)
toolbar.addAction(self.action)
def get_icon(self):
icon = material_icon(
self.icon_name,
size=(20, 20),
convert_to_pixmap=False,
filled=self.filled,
color=self.color,
)
return icon
class DeviceSelectionAction(ToolBarAction):
"""
Action for selecting a device in a combobox.
Args:
label (str): The label for the combobox.
device_combobox (DeviceComboBox): The combobox for selecting the device.
"""
def __init__(self, label: str, device_combobox):
super().__init__()
self.label = label
self.device_combobox = device_combobox
self.device_combobox.currentIndexChanged.connect(lambda: self.set_combobox_style("#ffa700"))
def add_to_toolbar(self, toolbar, target):
widget = QWidget()
layout = QHBoxLayout(widget)
label = QLabel(f"{self.label}")
layout.addWidget(label)
layout.addWidget(self.device_combobox)
toolbar.addWidget(widget)
def set_combobox_style(self, color: str):
self.device_combobox.setStyleSheet(f"QComboBox {{ background-color: {color}; }}")
class WidgetAction(ToolBarAction):
"""
Action for adding any widget to the toolbar.
Args:
label (str|None): The label for the widget.
widget (QWidget): The widget to be added to the toolbar.
"""
def __init__(self, label: str | None = None, widget: QWidget = None, parent=None):
super().__init__(parent)
self.label = label
self.widget = widget
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
container = QWidget()
layout = QHBoxLayout(container)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(5)
if self.label is not None:
label_widget = QLabel(f"{self.label}")
label_widget.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Fixed)
label_widget.setAlignment(Qt.AlignVCenter | Qt.AlignRight)
layout.addWidget(label_widget)
if isinstance(self.widget, QComboBox):
self.widget.setSizeAdjustPolicy(QComboBox.AdjustToContents)
size_policy = QSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
self.widget.setSizePolicy(size_policy)
self.widget.setMinimumWidth(self.calculate_minimum_width(self.widget))
else:
self.widget.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Fixed)
layout.addWidget(self.widget)
toolbar.addWidget(container)
@staticmethod
def calculate_minimum_width(combo_box: QComboBox) -> int:
"""
Calculate the minimum width required to display the longest item in the combo box.
Args:
combo_box (QComboBox): The combo box to calculate the width for.
Returns:
int: The calculated minimum width in pixels.
"""
font_metrics = combo_box.fontMetrics()
max_width = max(font_metrics.width(combo_box.itemText(i)) for i in range(combo_box.count()))
return max_width + 60
class ExpandableMenuAction(ToolBarAction):
"""
Action for an expandable menu in the toolbar.
Args:
label (str): The label for the menu.
actions (dict): A dictionary of actions to populate the menu.
icon_path (str, optional): The path to the icon file. Defaults to None.
"""
def __init__(self, label: str, actions: dict, icon_path: str = None):
super().__init__(icon_path, label)
self.actions = actions
self.widgets = defaultdict(dict)
def add_to_toolbar(self, toolbar: QToolBar, target: QWidget):
button = QToolButton(toolbar)
if self.icon_path:
button.setIcon(QIcon(self.icon_path))
button.setText(self.tooltip)
button.setPopupMode(QToolButton.InstantPopup)
button.setStyleSheet(
"""
QToolButton {
font-size: 14px;
}
QMenu {
font-size: 14px;
}
"""
)
menu = QMenu(button)
for action_id, action in self.actions.items():
sub_action = QAction(action.tooltip, target)
if hasattr(action, "icon_path"):
icon = QIcon()
icon.addFile(action.icon_path, size=QSize(20, 20))
sub_action.setIcon(icon)
elif hasattr(action, "get_icon"):
sub_action.setIcon(action.get_icon())
sub_action.setCheckable(action.checkable)
menu.addAction(sub_action)
self.widgets[action_id] = sub_action
button.setMenu(menu)
toolbar.addWidget(button)
class ModularToolBar(QToolBar):
"""Modular toolbar with optional automatic initialization.
Args:
parent (QWidget, optional): The parent widget of the toolbar. Defaults to None.
actions (list[ToolBarAction], optional): A list of action creators to populate the toolbar. Defaults to None.
target_widget (QWidget, optional): The widget that the actions will target. Defaults to None.
"""
def __init__(self, parent=None, actions: dict | None = None, target_widget=None):
super().__init__(parent)
self.widgets = defaultdict(dict)
self.set_background_color()
if actions is not None and target_widget is not None:
self.populate_toolbar(actions, target_widget)
def populate_toolbar(self, actions: dict, target_widget):
"""Populates the toolbar with a set of actions.
Args:
actions (list[ToolBarAction]): A list of action creators to populate the toolbar.
target_widget (QWidget): The widget that the actions will target.
"""
self.clear()
for action_id, action in actions.items():
action.add_to_toolbar(self, target_widget)
self.widgets[action_id] = action
def set_background_color(self):
self.setIconSize(QSize(20, 20))
self.setMovable(False)
self.setFloatable(False)
self.setContentsMargins(0, 0, 0, 0)
self.setStyleSheet("QToolBar { background-color: rgba(0, 0, 0, 0); border: none; }")

View File

@@ -224,11 +224,3 @@ DEVICES = [
Positioner("test", limits=[-10, 10], read_value=2.0),
Device("test_device"),
]
def check_remote_data_size(widget, plot_name, num_elements):
"""
Check if the remote data has the correct number of elements.
Used in the qtbot.waitUntil function.
"""
return len(widget.get_all_data()[plot_name]["x"]) == num_elements

View File

@@ -3,29 +3,22 @@ from __future__ import annotations
import os
import time
import traceback
import uuid
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from typing import Optional
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import_from
from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import QObject, QRunnable, QThreadPool, QTimer, Signal
from qtpy.QtCore import QObject, QRunnable, QThreadPool, Signal
from qtpy.QtWidgets import QApplication
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.error_popups import ErrorPopupUtility, SafeSlot
from bec_widgets.utils.widget_io import WidgetHierarchy
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.qt_utils.error_popups import ErrorPopupUtility
from bec_widgets.qt_utils.error_popups import SafeSlot as pyqtSlot
from bec_widgets.utils.yaml_dialog import load_yaml, load_yaml_gui, save_yaml, save_yaml_gui
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.widgets.containers.dock import BECDock
else:
BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispatcher",))
logger = bec_logger.logger
BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispatcher",))
class ConnectionConfig(BaseModel):
@@ -43,7 +36,8 @@ class ConnectionConfig(BaseModel):
"""Generate a GUI ID if none is provided."""
if v is None:
widget_class = values.data["widget_class"]
v = f"{widget_class}_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f')}"
v = f"{widget_class}_{str(time.time())}"
return v
return v
@@ -75,55 +69,18 @@ class Worker(QRunnable):
class BECConnector:
"""Connection mixin class to handle BEC client and device manager"""
USER_ACCESS = ["_config_dict", "_get_all_rpc", "_rpc_id"]
USER_ACCESS = ["_config_dict", "_get_all_rpc"]
EXIT_HANDLERS = {}
def __init__(
self,
client=None,
config: ConnectionConfig | None = None,
gui_id: str | None = None,
object_name: str | None = None,
parent_dock: BECDock | None = None, # TODO should go away -> issue created #473
root_widget: bool = False,
**kwargs,
):
"""
BECConnector mixin class to handle BEC client and device manager.
Args:
client(BECClient, optional): The BEC client.
config(ConnectionConfig, optional): The connection configuration with specific gui id.
gui_id(str, optional): The GUI ID.
object_name(str, optional): The object name.
parent_dock(BECDock, optional): The parent dock.# TODO should go away -> issue created #473
root_widget(bool, optional): If set to True, the parent_id will be always set to None, thus enforcing that the widget is accessible as a root widget of the BECGuiClient object.
**kwargs:
"""
# Extract object_name from kwargs to not pass it to Qt class
object_name = object_name or kwargs.pop("objectName", None)
# Ensure the parent is always the first argument for QObject
parent = kwargs.pop("parent", None)
# This initializes the QObject or any qt related class BECConnector has to be used from this line down with QObject, otherwise hierarchy logic will not work
super().__init__(parent=parent, **kwargs)
assert isinstance(
self, QObject
), "BECConnector must be used with a QObject or any qt related class."
# flag to check if the object was destroyed and its cleanup was called
self._destroyed = False
def __init__(self, client=None, config: ConnectionConfig = None, gui_id: str = None):
# BEC related connections
self.bec_dispatcher = BECDispatcher(client=client)
self.client = self.bec_dispatcher.client if client is None else client
self._parent_dock = parent_dock # TODO also remove at some point -> issue created #473
self.rpc_register = RPCRegister()
if not self.client in BECConnector.EXIT_HANDLERS:
# register function to clean connections at exit;
# the function depends on BECClient, and BECDispatcher
@SafeSlot()
@pyqtSlot()
def terminate(client=self.client, dispatcher=self.bec_dispatcher):
logger.info("Disconnecting", repr(dispatcher))
dispatcher.disconnect_all()
@@ -143,147 +100,24 @@ class BECConnector:
)
self.config = ConnectionConfig(widget_class=self.__class__.__name__)
# If the gui_id is passed, it should be respected. However, this should be revisted since
# the gui_id has to be unique, and may no longer be.
if gui_id:
self.config.gui_id = gui_id
self.gui_id: str = gui_id # Keep namespace in sync
self.gui_id = gui_id
else:
self.gui_id: str = self.config.gui_id # type: ignore
self.gui_id = self.config.gui_id
if object_name is not None:
self.setObjectName(object_name)
# 1) If no objectName is set, set the initial name
if not self.objectName():
self.setObjectName(self.__class__.__name__)
self.object_name = self.objectName()
# 2) Enforce unique objectName among siblings with the same BECConnector parent
self.setParent(parent)
if isinstance(self.parent(), QObject) and hasattr(self, "cleanup"):
self.parent().destroyed.connect(self._run_cleanup_on_deleted_parent)
# register widget to rpc register
# be careful: when registering, and the object is not a BECWidget,
# cleanup has to called manually since there is no 'closeEvent'
self.rpc_register = RPCRegister()
self.rpc_register.add_rpc(self)
# Error popups
self.error_utility = ErrorPopupUtility()
self._thread_pool = QThreadPool.globalInstance()
# Store references to running workers so they're not garbage collected prematurely.
self._workers = []
# If set to True, the parent_id will be always set to None, thus enforcing that the widget is accessible as a root widget of the BECGuiClient object.
self.root_widget = root_widget
QTimer.singleShot(0, self._update_object_name)
@property
def parent_id(self) -> str | None:
try:
if self.root_widget:
return None
connector_parent = WidgetHierarchy._get_becwidget_ancestor(self)
return connector_parent.gui_id if connector_parent else None
except:
logger.error(f"Error getting parent_id for {self.__class__.__name__}")
@SafeSlot()
def _run_cleanup_on_deleted_parent(self) -> None:
"""
Run cleanup on the deleted parent.
This method is called when the parent is deleted.
"""
if not hasattr(self, "cleanup"):
return
try:
if not self._destroyed:
self.cleanup()
self._destroyed = True
except Exception:
content = traceback.format_exc()
logger.info(
"Failed to run cleanup on deleted parent. "
f"This is not necessarily an error as the parent may be deleted before the child and includes already a cleanup. The following exception was raised:\n{content}"
)
def change_object_name(self, name: str) -> None:
"""
Change the object name of the widget. Unregister old name and register the new one.
Args:
name (str): The new object name.
"""
self.rpc_register.remove_rpc(self)
self.setObjectName(name.replace("-", "_").replace(" ", "_"))
QTimer.singleShot(0, self._update_object_name)
def _update_object_name(self) -> None:
"""
Enforce a unique object name among siblings and register the object for RPC.
This method is called through a single shot timer kicked off in the constructor.
"""
# 1) Enforce unique objectName among siblings with the same BECConnector parent
self._enforce_unique_sibling_name()
# 2) Register the object for RPC
self.rpc_register.add_rpc(self)
def _enforce_unique_sibling_name(self):
"""
Enforce that this BECConnector has a unique objectName among its siblings.
Sibling logic:
- If there's a nearest BECConnector parent, only compare with children of that parent.
- If parent is None (i.e., top-level object), compare with all other top-level BECConnectors.
"""
QApplication.processEvents()
parent_bec = WidgetHierarchy._get_becwidget_ancestor(self)
if parent_bec:
# We have a parent => only compare with siblings under that parent
siblings = parent_bec.findChildren(BECConnector)
else:
# No parent => treat all top-level BECConnectors as siblings
# 1) Gather all BECConnectors from QApplication
all_widgets = QApplication.allWidgets()
all_bec = [w for w in all_widgets if isinstance(w, BECConnector)]
# 2) "Top-level" means closest BECConnector parent is None
top_level_bec = [
w for w in all_bec if WidgetHierarchy._get_becwidget_ancestor(w) is None
]
# 3) We are among these top-level siblings
siblings = top_level_bec
# Collect used names among siblings
used_names = {sib.objectName() for sib in siblings if sib is not self}
base_name = self.object_name
if base_name not in used_names:
# Name is already unique among siblings
return
# Need a suffix to avoid collision
counter = 0
while True:
trial_name = f"{base_name}_{counter}"
if trial_name not in used_names:
self.setObjectName(trial_name)
self.object_name = trial_name
break
counter += 1
# pylint: disable=invalid-name
def setObjectName(self, name: str) -> None:
"""
Set the object name of the widget.
Args:
name (str): The new object name.
"""
super().setObjectName(name)
self.object_name = name
if self.rpc_register.object_is_registered(self):
self.rpc_register.broadcast()
def submit_task(self, fn, *args, on_complete: SafeSlot = None, **kwargs) -> Worker:
def submit_task(self, fn, *args, on_complete: pyqtSlot = None, **kwargs) -> Worker:
"""
Submit a task to run in a separate thread. The task will run the specified
function with the provided arguments and emit the completed signal when done.
@@ -310,14 +144,11 @@ class BECConnector:
>>> def on_complete():
>>> print("Task complete")
>>> self.submit_task(my_function, 1, 2, on_complete=on_complete)
"""
worker = Worker(fn, *args, **kwargs)
if on_complete:
worker.signals.completed.connect(on_complete)
# Keep a reference to the worker so it is not garbage collected.
self._workers.append(worker)
# When the worker is done, remove it from our list.
worker.signals.completed.connect(lambda: self._workers.remove(worker))
self._thread_pool.start(worker)
return worker
@@ -349,39 +180,37 @@ class BECConnector:
@_config_dict.setter
def _config_dict(self, config: BaseModel) -> None:
"""
Set the configuration of the widget.
Get the configuration of the widget.
Args:
config (BaseModel): The new configuration model.
Returns:
dict: The configuration of the widget.
"""
self.config = config
# FIXME some thoughts are required to decide how thhis should work with rpc registry
def apply_config(self, config: dict, generate_new_id: bool = True) -> None:
"""
Apply the configuration to the widget.
Args:
config (dict): Configuration settings.
generate_new_id (bool): If True, generate a new GUI ID for the widget.
config(dict): Configuration settings.
generate_new_id(bool): If True, generate a new GUI ID for the widget.
"""
self.config = ConnectionConfig(**config)
if generate_new_id is True:
gui_id = str(uuid.uuid4())
self.rpc_register.remove_rpc(self)
self._set_gui_id(gui_id)
self.set_gui_id(gui_id)
self.rpc_register.add_rpc(self)
else:
self.gui_id = self.config.gui_id
# FIXME some thoughts are required to decide how thhis should work with rpc registry
def load_config(self, path: str | None = None, gui: bool = False):
"""
Load the configuration of the widget from YAML.
Args:
path (str | None): Path to the configuration file for non-GUI dialog mode.
gui (bool): If True, use the GUI dialog to load the configuration file.
path(str): Path to the configuration file for non-GUI dialog mode.
gui(bool): If True, use the GUI dialog to load the configuration file.
"""
if gui is True:
config = load_yaml_gui(self)
@@ -400,8 +229,8 @@ class BECConnector:
Save the configuration of the widget to YAML.
Args:
path (str | None): Path to save the configuration file for non-GUI dialog mode.
gui (bool): If True, use the GUI dialog to save the configuration file.
path(str): Path to save the configuration file for non-GUI dialog mode.
gui(bool): If True, use the GUI dialog to save the configuration file.
"""
if gui is True:
save_yaml_gui(self, self._config_dict)
@@ -409,15 +238,16 @@ class BECConnector:
if path is None:
path = os.getcwd()
file_path = os.path.join(path, f"{self.__class__.__name__}_config.yaml")
save_yaml(file_path, self._config_dict)
# @SafeSlot(str)
def _set_gui_id(self, gui_id: str) -> None:
@pyqtSlot(str)
def set_gui_id(self, gui_id: str) -> None:
"""
Set the GUI ID for the widget.
Args:
gui_id (str): GUI ID.
gui_id(str): GUI ID
"""
self.config.gui_id = gui_id
self.gui_id = gui_id
@@ -438,84 +268,36 @@ class BECConnector:
"""Update the client and device manager from BEC and create object for BEC shortcuts.
Args:
client: BEC client.
client: BEC client
"""
self.client = client
self.get_bec_shortcuts()
@SafeSlot(ConnectionConfig) # TODO can be also dict
@pyqtSlot(ConnectionConfig) # TODO can be also dict
def on_config_update(self, config: ConnectionConfig | dict) -> None:
"""
Update the configuration for the widget.
Args:
config (ConnectionConfig | dict): Configuration settings.
config(ConnectionConfig): Configuration settings.
"""
gui_id = getattr(config, "gui_id", None)
if isinstance(config, dict):
config = ConnectionConfig(**config)
self.config = config
if gui_id and config.gui_id != gui_id: # Recreating config should not overwrite the gui_id
self.config.gui_id = gui_id
# TODO add error handler
def remove(self):
"""Cleanup the BECConnector"""
# If the widget is attached to a dock, remove it from the dock.
# TODO this should be handled by dock and dock are not by BECConnector -> issue created #473
if self._parent_dock is not None:
self._parent_dock.delete(self.object_name)
# If the widget is from Qt, trigger its close method.
elif hasattr(self, "close"):
self.close()
# If the widget is neither from a Dock nor from Qt, remove it from the RPC registry.
# i.e. Curve Item from Waveform
else:
self.rpc_register.remove_rpc(self)
self.config = config
def get_config(self, dict_output: bool = True) -> dict | BaseModel:
"""
Get the configuration of the widget.
Args:
dict_output (bool): If True, return the configuration as a dictionary.
If False, return the configuration as a pydantic model.
dict_output(bool): If True, return the configuration as a dictionary. If False, return the configuration as a pydantic model.
Returns:
dict | BaseModel: The configuration of the widget.
dict: The configuration of the plot widget.
"""
if dict_output:
return self.config.model_dump()
else:
return self.config
# --- Example usage of BECConnector: running a simple task ---
if __name__ == "__main__": # pragma: no cover
import sys
# Create a QApplication instance (required for QThreadPool)
app = QApplication(sys.argv)
connector = BECConnector()
def print_numbers():
"""
Task function that prints numbers 1 to 10 with a 0.5 second delay between each.
"""
for i in range(1, 11):
print(i)
time.sleep(0.5)
def task_complete():
"""
Called when the task is complete.
"""
print("Task complete")
# Exit the application after the task completes.
app.quit()
# Submit the task using the connector's submit_task method.
connector.submit_task(print_numbers, on_complete=task_complete)
# Start the Qt event loop.
sys.exit(app.exec_())

View File

@@ -10,15 +10,13 @@ from bec_qthemes import material_icon
from qtpy import PYSIDE6
from qtpy.QtGui import QIcon
from bec_widgets.utils.bec_plugin_helper import user_widget_plugin
if PYSIDE6:
from PySide6.scripts.pyside_tool import (
_extend_path_var,
init_virtual_env,
qt_tool_wrapper,
is_pyenv_python,
is_virtual_env,
qt_tool_wrapper,
ui_tool_binary,
)
@@ -78,7 +76,7 @@ def list_editable_packages() -> set[str]:
return editable_packages
def patch_designer(cmd_args: list[str] = []): # pragma: no cover
def patch_designer(): # pragma: no cover
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
@@ -95,31 +93,24 @@ def patch_designer(cmd_args: list[str] = []): # pragma: no cover
_extend_path_var("PATH", os.fspath(Path(sys._base_executable).parent), True)
else:
if sys.platform == "linux":
suffix = f"{sys.abiflags}.so"
env_var = "LD_PRELOAD"
current_pid = os.getpid()
with open(f"/proc/{current_pid}/maps", "rt") as f:
for line in f:
if "libpython" in line:
lib_path = line.split()[-1]
os.environ[env_var] = lib_path
break
elif sys.platform == "darwin":
suffix = ".dylib"
env_var = "DYLD_INSERT_LIBRARIES"
version = f"{major_version}.{minor_version}"
library_name = f"libpython{version}{suffix}"
lib_path = str(Path(sysconfig.get_config_var("LIBDIR")) / library_name)
os.environ[env_var] = lib_path
else:
raise RuntimeError(f"Unsupported platform: {sys.platform}")
version = f"{major_version}.{minor_version}"
library_name = f"libpython{version}{suffix}"
lib_path = str(Path(sysconfig.get_config_var("LIBDIR")) / library_name)
os.environ[env_var] = lib_path
if is_pyenv_python() or is_virtual_env():
# append all editable packages to the PYTHONPATH
editable_packages = list_editable_packages()
for pckg in editable_packages:
_extend_path_var("PYTHONPATH", pckg, True)
qt_tool_wrapper(ui_tool_binary("designer"), cmd_args)
qt_tool_wrapper(ui_tool_binary("designer"), sys.argv[1:])
def find_plugin_paths(base_path: Path):
@@ -147,24 +138,15 @@ def set_plugin_environment_variable(plugin_paths):
# Patch the designer function
def open_designer(cmd_args: list[str] = []): # pragma: no cover
def main(): # pragma: no cover
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Exiting...")
return
base_dir = Path(os.path.dirname(bec_widgets.__file__)).resolve()
plugin_paths = find_plugin_paths(base_dir)
if (plugin_repo := user_widget_plugin()) and isinstance(plugin_repo.__file__, str):
plugin_repo_dir = Path(os.path.dirname(plugin_repo.__file__)).resolve()
plugin_paths.extend(find_plugin_paths(plugin_repo_dir))
set_plugin_environment_variable(plugin_paths)
patch_designer(cmd_args)
def main():
open_designer(sys.argv[1:])
patch_designer()
if __name__ == "__main__": # pragma: no cover

View File

@@ -1,12 +1,9 @@
from __future__ import annotations
import collections
import random
import string
from collections.abc import Callable
from typing import TYPE_CHECKING, DefaultDict, Hashable, Union
from typing import TYPE_CHECKING, Union
import louie
import redis
from bec_lib.client import BECClient
from bec_lib.logger import bec_logger
@@ -15,52 +12,28 @@ from bec_lib.service_config import ServiceConfig
from qtpy.QtCore import QObject
from qtpy.QtCore import Signal as pyqtSignal
from bec_widgets.utils.serialization import register_serializer_extension
logger = bec_logger.logger
if TYPE_CHECKING: # pragma: no cover
if TYPE_CHECKING:
from bec_lib.endpoints import EndpointInfo
from bec_widgets.utils.rpc_server import RPCServer
class QtThreadSafeCallback(QObject):
"""QtThreadSafeCallback is a wrapper around a callback function to make it thread-safe for Qt."""
cb_signal = pyqtSignal(dict, dict)
def __init__(self, cb: Callable, cb_info: dict | None = None):
"""
Initialize the QtThreadSafeCallback.
Args:
cb (Callable): The callback function to be wrapped.
cb_info (dict, optional): Additional information about the callback. Defaults to None.
"""
def __init__(self, cb):
super().__init__()
self.cb_info = cb_info
self.cb = cb
self.cb_ref = louie.saferef.safe_ref(cb)
self.cb_signal.connect(self.cb)
self.topics = set()
def __hash__(self):
# make 2 differents QtThreadSafeCallback to look
# identical when used as dictionary keys, if the
# callback is the same
return f"{id(self.cb_ref)}{self.cb_info}".__hash__()
def __eq__(self, other):
if not isinstance(other, QtThreadSafeCallback):
return False
return self.cb_ref == other.cb_ref and self.cb_info == other.cb_info
return id(self.cb)
def __call__(self, msg_content, metadata):
if self.cb_ref() is None:
# callback has been deleted
return
self.cb_signal.emit(msg_content, metadata)
@@ -100,29 +73,18 @@ class BECDispatcher:
_instance = None
_initialized = False
client: BECClient
cli_server: RPCServer | None = None
def __new__(
cls,
client=None,
config: str | ServiceConfig | None = None,
gui_id: str | None = None,
*args,
**kwargs,
):
def __new__(cls, client=None, config: str = None, *args, **kwargs):
if cls._instance is None:
cls._instance = super(BECDispatcher, cls).__new__(cls)
cls._initialized = False
return cls._instance
def __init__(self, client=None, config: str | ServiceConfig | None = None, gui_id: str = None):
def __init__(self, client=None, config: str | ServiceConfig = None):
if self._initialized:
return
self._registered_slots: DefaultDict[Hashable, QtThreadSafeCallback] = (
collections.defaultdict()
)
self._slots = collections.defaultdict(set)
self.client = client
if self.client is None:
@@ -145,18 +107,11 @@ class BECDispatcher:
except redis.exceptions.ConnectionError:
logger.warning("Could not connect to Redis, skipping start of BECClient.")
register_serializer_extension()
logger.success("Initialized BECDispatcher")
self.start_cli_server(gui_id=gui_id)
self._initialized = True
@classmethod
def reset_singleton(cls):
"""
Reset the singleton instance of the BECDispatcher.
"""
cls._instance = None
cls._initialized = False
@@ -164,7 +119,6 @@ class BECDispatcher:
self,
slot: Callable,
topics: Union[EndpointInfo, str, list[Union[EndpointInfo, str]]],
cb_info: dict | None = None,
**kwargs,
) -> None:
"""Connect widget's qt slot, so that it is called on new pub/sub topic message.
@@ -173,15 +127,11 @@ class BECDispatcher:
slot (Callable): A slot method/function that accepts two inputs: content and metadata of
the corresponding pub/sub message
topics (EndpointInfo | str | list): A topic or list of topics that can typically be acquired via bec_lib.MessageEndpoints
cb_info (dict | None): A dictionary containing information about the callback. Defaults to None.
"""
qt_slot = QtThreadSafeCallback(cb=slot, cb_info=cb_info)
if qt_slot not in self._registered_slots:
self._registered_slots[qt_slot] = qt_slot
qt_slot = self._registered_slots[qt_slot]
self.client.connector.register(topics, cb=qt_slot, **kwargs)
slot = QtThreadSafeCallback(slot)
self.client.connector.register(topics, cb=slot, **kwargs)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
qt_slot.topics.update(set(topics_str))
self._slots[slot].update(set(topics_str))
def disconnect_slot(self, slot: Callable, topics: Union[str, list]):
"""
@@ -194,16 +144,16 @@ class BECDispatcher:
# find the right slot to disconnect from ;
# slot callbacks are wrapped in QtThreadSafeCallback objects,
# but the slot we receive here is the original callable
for connected_slot in self._registered_slots.values():
for connected_slot in self._slots:
if connected_slot.cb == slot:
break
else:
return
self.client.connector.unregister(topics, cb=connected_slot)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
self._registered_slots[connected_slot].topics.difference_update(set(topics_str))
if not self._registered_slots[connected_slot].topics:
del self._registered_slots[connected_slot]
self._slots[connected_slot].difference_update(set(topics_str))
if not self._slots[connected_slot]:
del self._slots[connected_slot]
def disconnect_topics(self, topics: Union[str, list]):
"""
@@ -214,16 +164,11 @@ class BECDispatcher:
"""
self.client.connector.unregister(topics)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
remove_slots = []
for connected_slot in self._registered_slots.values():
connected_slot.topics.difference_update(set(topics_str))
if not connected_slot.topics:
remove_slots.append(connected_slot)
for connected_slot in remove_slots:
self._registered_slots.pop(connected_slot, None)
for slot in list(self._slots.keys()):
slot_topics = self._slots[slot]
slot_topics.difference_update(set(topics_str))
if not slot_topics:
del self._slots[slot]
def disconnect_all(self, *args, **kwargs):
"""
@@ -233,49 +178,4 @@ class BECDispatcher:
*args: Arbitrary positional arguments
**kwargs: Arbitrary keyword arguments
"""
# pylint: disable=protected-access
self.disconnect_topics(self.client.connector._topics_cb)
def start_cli_server(self, gui_id: str | None = None):
"""
Start the CLI server.
Args:
gui_id(str, optional): The GUI ID. Defaults to None. If None, a unique identifier will be generated.
"""
# pylint: disable=import-outside-toplevel
from bec_widgets.utils.rpc_server import RPCServer
if gui_id is None:
gui_id = self.generate_unique_identifier()
if not self.client.started:
logger.error("Cannot start CLI server without a running client")
return
self.cli_server = RPCServer(gui_id, dispatcher=self, client=self.client)
logger.success(f"Started CLI server with gui_id: {gui_id}")
def stop_cli_server(self):
"""
Stop the CLI server.
"""
if self.cli_server is None:
logger.error("Cannot stop CLI server without starting it first")
return
self.cli_server.shutdown()
self.cli_server = None
logger.success("Stopped CLI server")
@staticmethod
def generate_unique_identifier(length: int = 4) -> str:
"""
Generate a unique identifier for the application.
Args:
length: The length of the identifier. Defaults to 4.
Returns:
str: The unique identifier.
"""
allowed_chars = string.ascii_lowercase + string.digits
return "".join(random.choices(allowed_chars, k=length))

View File

@@ -1,89 +0,0 @@
from __future__ import annotations
import importlib.metadata
import inspect
import pkgutil
from importlib import util as importlib_util
from importlib.machinery import FileFinder, ModuleSpec, SourceFileLoader
from types import ModuleType
from typing import Generator
from bec_widgets.utils.bec_widget import BECWidget
def _submodule_specs(module: ModuleType) -> tuple[ModuleSpec | None, ...]:
"""Return specs for all submodules of the given module."""
return tuple(
module_info.module_finder.find_spec(module_info.name)
for module_info in pkgutil.iter_modules(module.__path__)
if isinstance(module_info.module_finder, FileFinder)
)
def _loaded_submodules_from_specs(
submodule_specs: tuple[ModuleSpec | None, ...],
) -> Generator[ModuleType, None, None]:
"""Load all submodules from the given specs."""
for submodule in (
importlib_util.module_from_spec(spec) for spec in submodule_specs if spec is not None
):
assert isinstance(
submodule.__loader__, SourceFileLoader
), "Module found from FileFinder should have SourceFileLoader!"
submodule.__loader__.exec_module(submodule)
yield submodule
def _submodule_by_name(module: ModuleType, name: str):
for submod in _loaded_submodules_from_specs(_submodule_specs(module)):
if submod.__name__ == name:
return submod
return None
def _get_widgets_from_module(module: ModuleType) -> dict[str, "type[BECWidget]"]:
"""Find any BECWidget subclasses in the given module and return them with their names."""
from bec_widgets.utils.bec_widget import BECWidget # avoid circular import
return dict(
inspect.getmembers(
module,
predicate=lambda item: inspect.isclass(item)
and issubclass(item, BECWidget)
and item is not BECWidget,
)
)
def _all_widgets_from_all_submods(module):
"""Recursively load submodules, find any BECWidgets, and return them all as a flat dict."""
widgets = _get_widgets_from_module(module)
if not hasattr(module, "__path__"):
return widgets
for submod in _loaded_submodules_from_specs(_submodule_specs(module)):
widgets.update(_all_widgets_from_all_submods(submod))
return widgets
def user_widget_plugin() -> ModuleType | None:
plugins = importlib.metadata.entry_points(group="bec.widgets.user_widgets") # type: ignore
return None if len(plugins) == 0 else tuple(plugins)[0].load()
def get_plugin_client_module() -> ModuleType | None:
"""If there is a plugin repository installed, return the client module."""
return _submodule_by_name(plugin, "client") if (plugin := user_widget_plugin()) else None
def get_all_plugin_widgets() -> dict[str, "type[BECWidget]"]:
"""If there is a plugin repository installed, load all widgets from it."""
if plugin := user_widget_plugin():
return _all_widgets_from_all_submods(plugin)
else:
return {}
if __name__ == "__main__": # pragma: no cover
# print(get_all_plugin_widgets())
client = get_plugin_client_module()
...

View File

@@ -1,47 +1,32 @@
"""This custom class is a thin wrapper around the SignalProxy class to allow signal calls to be blocked.
""" This custom class is a thin wrapper around the SignalProxy class to allow signal calls to be blocked.
Unblocking the proxy needs to be done through the slot unblock_proxy. The most likely use case for this class is
when the callback function is potentially initiating a slower progress, i.e. requesting a data analysis routine to
when the callback function is potentially initiating a slower progress, i.e. requesting a data analysis routine to
analyse data. Requesting a new fit may lead to request piling up and an overall slow done of performance. This proxy
will allow you to decide by yourself when to unblock and execute the callback again."""
from pyqtgraph import SignalProxy
from qtpy.QtCore import QTimer, Signal
from bec_widgets.utils.error_popups import SafeSlot
from qtpy.QtCore import Signal, Slot
class BECSignalProxy(SignalProxy):
"""
Thin wrapper around the SignalProxy class to allow signal calls to be blocked,
but arguments still being stored.
"""Thin wrapper around the SignalProxy class to allow signal calls to be blocked, but args still being stored
Args:
*args: Arguments to pass to the SignalProxy class.
rateLimit (int): The rateLimit of the proxy.
timeout (float): The number of seconds after which the proxy automatically
unblocks if still blocked. Default is 10.0 seconds.
**kwargs: Keyword arguments to pass to the SignalProxy class.
*args: Arguments to pass to the SignalProxy class
rateLimit (int): The rateLimit of the proxy
**kwargs: Keyword arguments to pass to the SignalProxy class
Example:
>>> proxy = BECSignalProxy(signal, rate_limit=25, slot=callback)
"""
>>> proxy = BECSignalProxy(signal, rate_limit=25, slot=callback)"""
is_blocked = Signal(bool)
def __init__(self, *args, rateLimit=25, timeout=10.0, **kwargs):
def __init__(self, *args, rateLimit=25, **kwargs):
super().__init__(*args, rateLimit=rateLimit, **kwargs)
self._blocking = False
self.old_args = None
self.new_args = None
# Store timeout value (in seconds)
self._timeout = timeout
# Create a single-shot timer for auto-unblocking
self._timer = QTimer()
self._timer.setSingleShot(True)
self._timer.timeout.connect(self._timeout_unblock)
@property
def blocked(self):
"""Returns if the proxy is blocked"""
@@ -61,30 +46,9 @@ class BECSignalProxy(SignalProxy):
self.old_args = args
super().signalReceived(*args)
self._timer.start(int(self._timeout * 1000))
@SafeSlot()
@Slot()
def unblock_proxy(self):
"""Unblock the proxy, and call the signalReceived method in case there was an update of the args."""
if self.blocked:
self._timer.stop()
self.blocked = False
if self.new_args != self.old_args:
self.signalReceived(*self.new_args)
@SafeSlot()
def _timeout_unblock(self):
"""
Internal method called by the QTimer upon timeout. Unblocks the proxy
automatically if it is still blocked.
"""
if self.blocked:
self.unblock_proxy()
def cleanup(self):
"""
Cleanup the proxy by stopping the timer and disconnecting the timeout signal.
"""
self._timer.stop()
self._timer.timeout.disconnect(self._timeout_unblock)
self._timer.deleteLater()
self.blocked = False
if self.new_args != self.old_args:
self.signalReceived(*self.new_args)

View File

@@ -1,19 +1,13 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import darkdetect
from bec_lib.logger import bec_logger
from qtpy.QtCore import QObject, Slot
from qtpy.QtWidgets import QApplication
from qtpy.QtCore import Slot
from qtpy.QtWidgets import QApplication, QWidget
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_connector import BECConnector, ConnectionConfig
from bec_widgets.utils.colors import set_theme
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.widgets.containers.dock import BECDock
logger = bec_logger.logger
@@ -23,17 +17,13 @@ class BECWidget(BECConnector):
# The icon name is the name of the icon in the icon theme, typically a name taken
# from fonts.google.com/icons. Override this in subclasses to set the icon name.
ICON_NAME = "widgets"
USER_ACCESS = ["remove"]
# pylint: disable=too-many-arguments
def __init__(
self,
client=None,
config: ConnectionConfig = None,
gui_id: str | None = None,
gui_id: str = None,
theme_update: bool = False,
parent_dock: BECDock | None = None, # TODO should go away -> issue created #473
**kwargs,
):
"""
Base class for all BEC widgets. This class should be used as a mixin class for all BEC widgets, e.g.:
@@ -52,12 +42,11 @@ class BECWidget(BECConnector):
theme_update(bool, optional): Whether to subscribe to theme updates. Defaults to False. When set to True, the
widget's apply_theme method will be called when the theme changes.
"""
super().__init__(
client=client, config=config, gui_id=gui_id, parent_dock=parent_dock, **kwargs
)
if not isinstance(self, QObject):
if not isinstance(self, QWidget):
raise RuntimeError(f"{repr(self)} is not a subclass of QWidget")
super().__init__(client=client, config=config, gui_id=gui_id)
# Set the theme to auto if it is not set yet
app = QApplication.instance()
if not hasattr(app, "theme"):
# DO NOT SET THE THEME TO AUTO! Otherwise, the qwebengineview will segfault
@@ -77,7 +66,7 @@ class BECWidget(BECConnector):
if hasattr(qapp, "theme_signal"):
qapp.theme_signal.theme_updated.connect(self._update_theme)
def _update_theme(self, theme: str | None = None):
def _update_theme(self, theme: str):
"""Update the theme."""
if theme is None:
qapp = QApplication.instance()
@@ -98,16 +87,10 @@ class BECWidget(BECConnector):
def cleanup(self):
"""Cleanup the widget."""
with RPCRegister.delayed_broadcast():
# All widgets need to call super().cleanup() in their cleanup method
logger.info(f"Registry cleanup for widget {self.__class__.__name__}")
self.rpc_register.remove_rpc(self)
def closeEvent(self, event):
"""Wrap the close even to ensure the rpc_register is cleaned up."""
self.rpc_register.remove_rpc(self)
try:
if not self._destroyed:
self.cleanup()
self._destroyed = True
self.cleanup()
finally:
super().closeEvent(event) # pylint: disable=no-member
super().closeEvent(event)

View File

@@ -1,380 +0,0 @@
from __future__ import annotations
import sys
from typing import Literal
import pyqtgraph as pg
from qtpy.QtCore import Property, QEasingCurve, QObject, QPropertyAnimation
from qtpy.QtWidgets import (
QApplication,
QHBoxLayout,
QMainWindow,
QPushButton,
QSizePolicy,
QVBoxLayout,
QWidget,
)
from typeguard import typechecked
from bec_widgets.widgets.containers.layout_manager.layout_manager import LayoutManagerWidget
class DimensionAnimator(QObject):
"""
Helper class to animate the size of a panel widget.
"""
def __init__(self, panel_widget: QWidget, direction: str):
super().__init__()
self.panel_widget = panel_widget
self.direction = direction
self._size = 0
@Property(int)
def panel_width(self):
"""
Returns the current width of the panel widget.
"""
return self._size
@panel_width.setter
def panel_width(self, val: int):
"""
Set the width of the panel widget.
Args:
val(int): The width to set.
"""
self._size = val
self.panel_widget.setFixedWidth(val)
@Property(int)
def panel_height(self):
"""
Returns the current height of the panel widget.
"""
return self._size
@panel_height.setter
def panel_height(self, val: int):
"""
Set the height of the panel widget.
Args:
val(int): The height to set.
"""
self._size = val
self.panel_widget.setFixedHeight(val)
class CollapsiblePanelManager(QObject):
"""
Manager class to handle collapsible panels from a main widget using LayoutManagerWidget.
"""
def __init__(self, layout_manager: LayoutManagerWidget, reference_widget: QWidget, parent=None):
super().__init__(parent)
self.layout_manager = layout_manager
self.reference_widget = reference_widget
self.animations = {}
self.panels = {}
self.direction_settings = {
"left": {"property": b"maximumWidth", "default_size": 200},
"right": {"property": b"maximumWidth", "default_size": 200},
"top": {"property": b"maximumHeight", "default_size": 150},
"bottom": {"property": b"maximumHeight", "default_size": 150},
}
def add_panel(
self,
direction: Literal["left", "right", "top", "bottom"],
panel_widget: QWidget,
target_size: int | None = None,
duration: int = 300,
):
"""
Add a panel widget to the layout manager.
Args:
direction(Literal["left", "right", "top", "bottom"]): Direction of the panel.
panel_widget(QWidget): The panel widget to add.
target_size(int, optional): The target size of the panel. Defaults to None.
duration(int): The duration of the animation in milliseconds. Defaults to 300.
"""
if direction not in self.direction_settings:
raise ValueError("Direction must be one of 'left', 'right', 'top', 'bottom'.")
if target_size is None:
target_size = self.direction_settings[direction]["default_size"]
self.layout_manager.add_widget_relative(
widget=panel_widget, reference_widget=self.reference_widget, position=direction
)
panel_widget.setVisible(False)
# Set initial constraints as flexible
if direction in ["left", "right"]:
panel_widget.setMaximumWidth(0)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
else:
panel_widget.setMaximumHeight(0)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.panels[direction] = {
"widget": panel_widget,
"direction": direction,
"target_size": target_size,
"duration": duration,
"animator": None,
}
def toggle_panel(
self,
direction: Literal["left", "right", "top", "bottom"],
target_size: int | None = None,
duration: int | None = None,
easing_curve: QEasingCurve = QEasingCurve.InOutQuad,
ensure_max: bool = False,
scale: float | None = None,
animation: bool = True,
):
"""
Toggle the specified panel.
Parameters:
direction (Literal["left", "right", "top", "bottom"]): Direction of the panel to toggle.
target_size (int, optional): Override target size for this toggle.
duration (int, optional): Override the animation duration.
easing_curve (QEasingCurve): Animation easing curve.
ensure_max (bool): If True, animate as a fixed-size panel.
scale (float, optional): If provided, calculate target_size from main widget size.
animation (bool): If False, no animation is performed; panel instantly toggles.
"""
if direction not in self.panels:
raise ValueError(f"No panel found in direction '{direction}'.")
panel_info = self.panels[direction]
panel_widget = panel_info["widget"]
dir_settings = self.direction_settings[direction]
# Determine final target size
if scale is not None:
main_rect = self.reference_widget.geometry()
if direction in ["left", "right"]:
computed_target = int(main_rect.width() * scale)
else:
computed_target = int(main_rect.height() * scale)
final_target_size = computed_target
else:
if target_size is None:
final_target_size = panel_info["target_size"]
else:
final_target_size = target_size
if duration is None:
duration = panel_info["duration"]
expanding_property = dir_settings["property"]
currently_visible = panel_widget.isVisible()
if ensure_max:
if panel_info["animator"] is None:
panel_info["animator"] = DimensionAnimator(panel_widget, direction)
animator = panel_info["animator"]
if direction in ["left", "right"]:
prop_name = b"panel_width"
else:
prop_name = b"panel_height"
else:
animator = None
prop_name = expanding_property
if currently_visible:
# Hide the panel
if ensure_max:
start_value = final_target_size
end_value = 0
finish_callback = lambda w=panel_widget, d=direction: self._after_hide_reset(w, d)
else:
start_value = (
panel_widget.width()
if direction in ["left", "right"]
else panel_widget.height()
)
end_value = 0
finish_callback = lambda w=panel_widget: w.setVisible(False)
else:
# Show the panel
start_value = 0
end_value = final_target_size
finish_callback = None
if ensure_max:
# Fix panel exactly
if direction in ["left", "right"]:
panel_widget.setMinimumWidth(0)
panel_widget.setMaximumWidth(final_target_size)
panel_widget.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
else:
panel_widget.setMinimumHeight(0)
panel_widget.setMaximumHeight(final_target_size)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
else:
# Flexible mode
if direction in ["left", "right"]:
panel_widget.setMinimumWidth(0)
panel_widget.setMaximumWidth(final_target_size)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
else:
panel_widget.setMinimumHeight(0)
panel_widget.setMaximumHeight(final_target_size)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
panel_widget.setVisible(True)
if not animation:
# No animation: instantly set final state
if end_value == 0:
# Hiding
if ensure_max:
# Reset after hide
self._after_hide_reset(panel_widget, direction)
else:
panel_widget.setVisible(False)
else:
# Showing
if ensure_max:
# Already set fixed size
if direction in ["left", "right"]:
panel_widget.setFixedWidth(end_value)
else:
panel_widget.setFixedHeight(end_value)
else:
# Just set maximum dimension
if direction in ["left", "right"]:
panel_widget.setMaximumWidth(end_value)
else:
panel_widget.setMaximumHeight(end_value)
return
# With animation
animation = QPropertyAnimation(animator if ensure_max else panel_widget, prop_name)
animation.setDuration(duration)
animation.setStartValue(start_value)
animation.setEndValue(end_value)
animation.setEasingCurve(easing_curve)
if end_value == 0 and finish_callback:
animation.finished.connect(finish_callback)
elif end_value == 0 and not finish_callback:
animation.finished.connect(lambda w=panel_widget: w.setVisible(False))
animation.start()
self.animations[panel_widget] = animation
@typechecked
def _after_hide_reset(
self, panel_widget: QWidget, direction: Literal["left", "right", "top", "bottom"]
):
"""
Reset the panel widget after hiding it in ensure_max mode.
Args:
panel_widget(QWidget): The panel widget to reset.
direction(Literal["left", "right", "top", "bottom"]): The direction of the panel.
"""
# Called after hiding a panel in ensure_max mode
panel_widget.setVisible(False)
if direction in ["left", "right"]:
panel_widget.setMinimumWidth(0)
panel_widget.setMaximumWidth(0)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
else:
panel_widget.setMinimumHeight(0)
panel_widget.setMaximumHeight(16777215)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
####################################################################################################
# The following code is for the GUI control panel to interact with the CollapsiblePanelManager.
# It is not covered by any tests as it serves only as an example for the CollapsiblePanelManager class.
####################################################################################################
class MainWindow(QMainWindow): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Panels with ensure_max, scale, and animation toggle")
self.resize(800, 600)
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
main_layout.setContentsMargins(10, 10, 10, 10)
main_layout.setSpacing(10)
# Buttons
buttons_layout = QHBoxLayout()
self.btn_left = QPushButton("Toggle Left (ensure_max=True)")
self.btn_top = QPushButton("Toggle Top (scale=0.5, no animation)")
self.btn_right = QPushButton("Toggle Right (ensure_max=True, scale=0.3)")
self.btn_bottom = QPushButton("Toggle Bottom (no animation)")
buttons_layout.addWidget(self.btn_left)
buttons_layout.addWidget(self.btn_top)
buttons_layout.addWidget(self.btn_right)
buttons_layout.addWidget(self.btn_bottom)
main_layout.addLayout(buttons_layout)
self.layout_manager = LayoutManagerWidget()
main_layout.addWidget(self.layout_manager)
# Main widget
self.main_plot = pg.PlotWidget()
self.main_plot.plot([1, 2, 3, 4], [4, 3, 2, 1])
self.layout_manager.add_widget(self.main_plot, 0, 0)
self.panel_manager = CollapsiblePanelManager(self.layout_manager, self.main_plot)
# Panels
self.left_panel = pg.PlotWidget()
self.left_panel.plot([1, 2, 3], [3, 2, 1])
self.panel_manager.add_panel("left", self.left_panel, target_size=200)
self.right_panel = pg.PlotWidget()
self.right_panel.plot([10, 20, 30], [1, 10, 1])
self.panel_manager.add_panel("right", self.right_panel, target_size=200)
self.top_panel = pg.PlotWidget()
self.top_panel.plot([1, 2, 3], [1, 2, 3])
self.panel_manager.add_panel("top", self.top_panel, target_size=150)
self.bottom_panel = pg.PlotWidget()
self.bottom_panel.plot([2, 4, 6], [10, 5, 10])
self.panel_manager.add_panel("bottom", self.bottom_panel, target_size=150)
# Connect buttons
# Left with ensure_max
self.btn_left.clicked.connect(
lambda: self.panel_manager.toggle_panel("left", ensure_max=True)
)
# Top with scale=0.5 and no animation
self.btn_top.clicked.connect(
lambda: self.panel_manager.toggle_panel("top", scale=0.5, animation=False)
)
# Right with ensure_max, scale=0.3
self.btn_right.clicked.connect(
lambda: self.panel_manager.toggle_panel("right", ensure_max=True, scale=0.3)
)
# Bottom no animation
self.btn_bottom.clicked.connect(
lambda: self.panel_manager.toggle_panel("bottom", target_size=100, animation=False)
)
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
w = MainWindow()
w.show()
sys.exit(app.exec())

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import itertools
import re
from typing import TYPE_CHECKING, Literal
@@ -11,7 +12,7 @@ from pydantic_core import PydanticCustomError
from qtpy.QtGui import QColor
from qtpy.QtWidgets import QApplication
if TYPE_CHECKING: # pragma: no cover
if TYPE_CHECKING:
from bec_qthemes._main import AccentColors
@@ -70,64 +71,15 @@ def apply_theme(theme: Literal["dark", "light"]):
Apply the theme to all pyqtgraph widgets. Do not use this function directly. Use set_theme instead.
"""
app = QApplication.instance()
graphic_layouts = [
child
for top in app.topLevelWidgets()
for child in top.findChildren(pg.GraphicsLayoutWidget)
]
plot_items = [
item
for gl in graphic_layouts
for item in gl.ci.items.keys() # ci is internal pg.GraphicsLayout that hosts all items
if isinstance(item, pg.PlotItem)
]
histograms = [
item
for gl in graphic_layouts
for item in gl.ci.items.keys() # ci is internal pg.GraphicsLayout that hosts all items
if isinstance(item, pg.HistogramLUTItem)
]
# Update background color based on the theme
if theme == "light":
background_color = "#e9ecef" # Subtle contrast for light mode
foreground_color = "#141414"
label_color = "#000000"
axis_color = "#666666"
else:
background_color = "#141414" # Dark mode
foreground_color = "#e9ecef"
label_color = "#FFFFFF"
axis_color = "#CCCCCC"
# update GraphicsLayoutWidget
pg.setConfigOptions(foreground=foreground_color, background=background_color)
for pg_widget in graphic_layouts:
pg_widget.setBackground(background_color)
# update PlotItems
for plot_item in plot_items:
for axis in ["left", "right", "top", "bottom"]:
plot_item.getAxis(axis).setPen(pg.mkPen(color=axis_color))
plot_item.getAxis(axis).setTextPen(pg.mkPen(color=label_color))
# Change title color
plot_item.titleLabel.setText(plot_item.titleLabel.text, color=label_color)
# Change legend color
if hasattr(plot_item, "legend") and plot_item.legend is not None:
plot_item.legend.setLabelTextColor(label_color)
# if legend is in plot item and theme is changed, has to be like that because of pg opt logic
for sample, label in plot_item.legend.items:
label_text = label.text
label.setText(label_text, color=label_color)
# update HistogramLUTItem
for histogram in histograms:
histogram.axis.setPen(pg.mkPen(color=axis_color))
histogram.axis.setTextPen(pg.mkPen(color=label_color))
# go through all pyqtgraph widgets and set background
children = itertools.chain.from_iterable(
top.findChildren(pg.GraphicsLayoutWidget) for top in app.topLevelWidgets()
)
pg.setConfigOptions(
foreground="d" if theme == "dark" else "k", background="k" if theme == "dark" else "w"
)
for pg_widget in children:
pg_widget.setBackground("k" if theme == "dark" else "w")
# now define stylesheet according to theme and apply it
style = bec_qthemes.load_stylesheet(theme)

View File

@@ -1,54 +1,30 @@
from __future__ import annotations
from typing import Any, Type
import itertools
from typing import Type
from qtpy.QtWidgets import QWidget
from bec_widgets.cli.client_utils import BECGuiClient
class WidgetContainerUtils:
# We need one handler that checks if a WIDGET of a given name is already created for that DOCKAREA
# 1. If the name exists, then it depends whether the name was auto-generated -> add _1 to the name
# or alternatively raise an error that it can't be added again ( just raise an error)
# 2. Dock names in between docks should also be unique
@staticmethod
def has_name_valid_chars(name: str) -> bool:
"""Check if the name is valid.
def generate_unique_widget_id(container: dict, prefix: str = "widget") -> str:
"""
Generate a unique widget ID.
Args:
name(str): The name to be checked.
container(dict): The container of widgets.
prefix(str): The prefix of the widget ID.
Returns:
bool: True if the name is valid, False otherwise.
widget_id(str): The unique widget ID.
"""
if not name or len(name) > 256:
return False # Don't accept empty names or names longer than 256 characters
check_value = name.replace("_", "").replace("-", "")
if not check_value.isalnum() or not check_value.isascii():
return False
return True
@staticmethod
def generate_unique_name(name: str, list_of_names: list[str] | None = None) -> str:
"""Generate a unique ID.
Args:
name(str): The name of the widget.
Returns:
tuple (str): The unique name
"""
if list_of_names is None:
list_of_names = []
ii = 0
while ii < 1000: # 1000 is arbritrary!
name_candidate = f"{name}_{ii}"
if name_candidate not in list_of_names:
return name_candidate
ii += 1
raise ValueError("Could not generate a unique name after within 1000 attempts.")
existing_ids = set(container.keys())
for i in itertools.count(1):
widget_id = f"{prefix}_{i}"
if widget_id not in existing_ids:
return widget_id
@staticmethod
def find_first_widget_by_class(
@@ -72,36 +48,3 @@ class WidgetContainerUtils:
return None
else:
raise ValueError(f"No widget of class {widget_class} found.")
@staticmethod
def name_is_protected(name: str, container: Any = None) -> bool:
"""
Check if the name is not protected.
Args:
name(str): The name to be checked.
Returns:
bool: True if the name is not protected, False otherwise.
"""
if container is None:
container = BECGuiClient
gui_client_methods = set(filter(lambda x: not x.startswith("_"), dir(container)))
return name in gui_client_methods
@staticmethod
def raise_for_invalid_name(name: str, container: Any = None) -> None:
"""
Check if the name is valid. If not, raise a ValueError.
Args:
name(str): The name to be checked.
Raises:
ValueError: If the name is not valid.
"""
if not WidgetContainerUtils.has_name_valid_chars(name):
raise ValueError(
f"Name '{name}' contains invalid characters. Only alphanumeric characters, underscores, and dashes are allowed."
)
if WidgetContainerUtils.name_is_protected(name, container):
raise ValueError(f"Name '{name}' is protected. Please choose another name.")

View File

@@ -1,7 +1,4 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any
import numpy as np
import pyqtgraph as pg
@@ -9,16 +6,13 @@ from qtpy.QtCore import QObject, Qt, Signal, Slot
from qtpy.QtWidgets import QApplication
class CrosshairScatterItem(pg.ScatterPlotItem):
class NonDownsamplingScatterPlotItem(pg.ScatterPlotItem):
def setDownsampling(self, ds=None, auto=None, method=None):
pass
def setClipToView(self, state):
pass
def setAlpha(self, *args, **kwargs):
pass
class Crosshair(QObject):
# QT Position of mouse cursor
@@ -53,15 +47,9 @@ class Crosshair(QObject):
self.v_line.skip_auto_range = True
self.h_line = pg.InfiniteLine(angle=0, movable=False)
self.h_line.skip_auto_range = True
# Add custom attribute to identify crosshair lines
self.v_line.is_crosshair = True
self.h_line.is_crosshair = True
self.plot_item.addItem(self.v_line, ignoreBounds=True)
self.plot_item.addItem(self.h_line, ignoreBounds=True)
# Initialize highlighted curve in a case of multiple curves
self.highlighted_curve_index = None
# Add TextItem to display coordinates
self.coord_label = pg.TextItem("", anchor=(1, 1), fill=(0, 0, 0, 100))
self.coord_label.setVisible(False) # Hide initially
@@ -82,7 +70,6 @@ class Crosshair(QObject):
self.plot_item.ctrl.downsampleSpin.valueChanged.connect(self.clear_markers)
# Initialize markers
self.items = []
self.marker_moved_1d = {}
self.marker_clicked_1d = {}
self.marker_2d = None
@@ -126,103 +113,61 @@ class Crosshair(QObject):
self.coord_label.fill = pg.mkBrush(label_bg_color)
self.coord_label.border = pg.mkPen(None)
@Slot(int)
def update_highlighted_curve(self, curve_index: int):
"""
Update the highlighted curve in the case of multiple curves in a plot item.
Args:
curve_index(int): The index of curve to highlight
"""
self.highlighted_curve_index = curve_index
self.clear_markers()
self.update_markers()
def update_markers(self):
"""Update the markers for the crosshair, creating new ones if necessary."""
if self.highlighted_curve_index is not None and hasattr(self.plot_item, "visible_curves"):
# Focus on the highlighted curve only
self.items = [self.plot_item.visible_curves[self.highlighted_curve_index]]
else:
# Handle all curves
self.items = self.plot_item.items
# Create or update markers
for item in self.items:
# Create new markers
for item in self.plot_item.items:
if isinstance(item, pg.PlotDataItem): # 1D plot
if item.name() in self.marker_moved_1d:
continue
pen = item.opts["pen"]
color = pen.color() if hasattr(pen, "color") else pg.mkColor(pen)
name = item.name() or str(id(item))
if name in self.marker_moved_1d:
# Update existing markers
marker_moved = self.marker_moved_1d[name]
marker_moved.setPen(pg.mkPen(color))
# Update clicked markers' brushes
for marker_clicked in self.marker_clicked_1d[name]:
alpha = marker_clicked.opts["brush"].color().alpha()
marker_clicked.setBrush(
pg.mkBrush(color.red(), color.green(), color.blue(), alpha)
)
# Update z-values
marker_moved.setZValue(item.zValue() + 1)
for marker_clicked in self.marker_clicked_1d[name]:
marker_clicked.setZValue(item.zValue() + 1)
else:
# Create new markers
marker_moved = CrosshairScatterItem(
size=10, pen=pg.mkPen(color), brush=pg.mkBrush(None)
)
marker_moved.skip_auto_range = True
marker_moved.is_crosshair = True
self.marker_moved_1d[name] = marker_moved
self.plot_item.addItem(marker_moved)
# Set marker z-value higher than the curve
marker_moved.setZValue(item.zValue() + 1)
marker_moved = NonDownsamplingScatterPlotItem(
size=10, pen=pg.mkPen(color), brush=pg.mkBrush(None)
)
marker_moved.skip_auto_range = True
self.marker_moved_1d[item.name()] = marker_moved
self.plot_item.addItem(marker_moved)
# Create glowing effect markers for clicked events
for size, alpha in [(18, 64), (14, 128), (10, 255)]:
marker_clicked = NonDownsamplingScatterPlotItem(
size=size,
pen=pg.mkPen(None),
brush=pg.mkBrush(color.red(), color.green(), color.blue(), alpha),
)
marker_clicked.skip_auto_range = True
self.marker_clicked_1d[item.name()] = marker_clicked
self.plot_item.addItem(marker_clicked)
# Create glowing effect markers for clicked events
marker_clicked_list = []
for size, alpha in [(18, 64), (14, 128), (10, 255)]:
marker_clicked = CrosshairScatterItem(
size=size,
pen=pg.mkPen(None),
brush=pg.mkBrush(color.red(), color.green(), color.blue(), alpha),
)
marker_clicked.skip_auto_range = True
marker_clicked.is_crosshair = True
self.plot_item.addItem(marker_clicked)
marker_clicked.setZValue(item.zValue() + 1)
marker_clicked_list.append(marker_clicked)
self.marker_clicked_1d[name] = marker_clicked_list
elif isinstance(item, pg.ImageItem): # 2D plot
if self.marker_2d is not None:
continue
self.marker_2d = pg.ROI(
[0, 0], size=[1, 1], pen=pg.mkPen("r", width=2), movable=False
)
self.marker_2d.skip_auto_range = True
self.plot_item.addItem(self.marker_2d)
def snap_to_data(
self, x: float, y: float
) -> tuple[None, None] | tuple[defaultdict[Any, list], defaultdict[Any, list]]:
def snap_to_data(self, x, y) -> tuple[defaultdict[list], defaultdict[list]]:
"""
Finds the nearest data points to the given x and y coordinates.
Args:
x(float): The x-coordinate of the mouse cursor
y(float): The y-coordinate of the mouse cursor
x: The x-coordinate of the mouse cursor
y: The y-coordinate of the mouse cursor
Returns:
tuple: x and y values snapped to the nearest data
"""
y_values = defaultdict(list)
x_values = defaultdict(list)
image_2d = None
# Iterate through items in the plot
for item in self.items:
for item in self.plot_item.items:
if isinstance(item, pg.PlotDataItem): # 1D plot
name = item.name() or str(id(item))
name = item.name()
plot_data = item._getDisplayDataset()
if plot_data is None:
continue
@@ -241,9 +186,9 @@ class Crosshair(QObject):
y_values[name] = closest_y
x_values[name] = closest_x
elif isinstance(item, pg.ImageItem): # 2D plot
name = item.config.monitor or str(id(item))
name = item.config.monitor
image_2d = item.image
# Clip the x and y values to the image dimensions to avoid out of bounds errors
# clip the x and y values to the image dimensions to avoid out of bounds errors
y_values[name] = int(np.clip(y, 0, image_2d.shape[1] - 1))
x_values[name] = int(np.clip(x, 0, image_2d.shape[0] - 1))
@@ -311,9 +256,9 @@ class Crosshair(QObject):
# not sure how we got here, but just to be safe...
return
for item in self.items:
for item in self.plot_item.items:
if isinstance(item, pg.PlotDataItem):
name = item.name() or str(id(item))
name = item.name()
x, y = x_snap_values[name], y_snap_values[name]
if x is None or y is None:
continue
@@ -326,7 +271,7 @@ class Crosshair(QObject):
)
self.coordinatesChanged1D.emit(coordinate_to_emit)
elif isinstance(item, pg.ImageItem):
name = item.config.monitor or str(id(item))
name = item.config.monitor
x, y = x_snap_values[name], y_snap_values[name]
if x is None or y is None:
continue
@@ -364,14 +309,13 @@ class Crosshair(QObject):
# not sure how we got here, but just to be safe...
return
for item in self.items:
for item in self.plot_item.items:
if isinstance(item, pg.PlotDataItem):
name = item.name() or str(id(item))
name = item.name()
x, y = x_snap_values[name], y_snap_values[name]
if x is None or y is None:
continue
for marker_clicked in self.marker_clicked_1d[name]:
marker_clicked.setData([x], [y])
self.marker_clicked_1d[name].setData([x], [y])
x_snapped_scaled, y_snapped_scaled = self.scale_emitted_coordinates(x, y)
coordinate_to_emit = (
name,
@@ -380,7 +324,7 @@ class Crosshair(QObject):
)
self.coordinatesClicked1D.emit(coordinate_to_emit)
elif isinstance(item, pg.ImageItem):
name = item.config.monitor or str(id(item))
name = item.config.monitor
x, y = x_snap_values[name], y_snap_values[name]
if x is None or y is None:
continue
@@ -393,12 +337,9 @@ class Crosshair(QObject):
def clear_markers(self):
"""Clears the markers from the plot."""
for marker in self.marker_moved_1d.values():
self.plot_item.removeItem(marker)
for markers in self.marker_clicked_1d.values():
for marker in markers:
self.plot_item.removeItem(marker)
self.marker_moved_1d.clear()
self.marker_clicked_1d.clear()
marker.clear()
for marker in self.marker_clicked_1d.values():
marker.clear()
def scale_emitted_coordinates(self, x, y):
"""Scales the emitted coordinates if the axes are in log scale.
@@ -424,17 +365,9 @@ class Crosshair(QObject):
"""
x, y = pos
x_scaled, y_scaled = self.scale_emitted_coordinates(x, y)
text = f"({x_scaled:.{self.precision}g}, {y_scaled:.{self.precision}g})"
for item in self.items:
if isinstance(item, pg.ImageItem):
image = item.image
ix = int(np.clip(x, 0, image.shape[0] - 1))
iy = int(np.clip(y, 0, image.shape[1] - 1))
intensity = image[ix, iy]
text += f"\nIntensity: {intensity:.{self.precision}g}"
break
# Update coordinate label
self.coord_label.setText(text)
# # Update coordinate label
self.coord_label.setText(f"({x_scaled:.{self.precision}g}, {y_scaled:.{self.precision}g})")
self.coord_label.setPos(x, y)
self.coord_label.setVisible(True)
@@ -450,9 +383,6 @@ class Crosshair(QObject):
self.clear_markers()
def cleanup(self):
if self.marker_2d is not None:
self.plot_item.removeItem(self.marker_2d)
self.marker_2d = None
self.plot_item.removeItem(self.v_line)
self.plot_item.removeItem(self.h_line)
self.plot_item.removeItem(self.coord_label)

View File

@@ -22,9 +22,7 @@ class EntryValidator:
if entry is None or entry == "":
entry = next(iter(device._hints), name) if hasattr(device, "_hints") else name
if entry not in description:
raise ValueError(
f"Entry '{entry}' not found in device '{name}' signals. Available signals: {description.keys()}"
)
raise ValueError(f"Entry '{entry}' not found in device '{name}' signals")
return entry

View File

@@ -1,72 +0,0 @@
from __future__ import annotations
from bec_qthemes import material_icon
from qtpy.QtWidgets import (
QFrame,
QHBoxLayout,
QLabel,
QLayout,
QSizePolicy,
QToolButton,
QVBoxLayout,
QWidget,
)
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
class ExpandableGroupFrame(QFrame):
EXPANDED_ICON_NAME: str = "collapse_all"
COLLAPSED_ICON_NAME: str = "expand_all"
def __init__(self, title: str, parent: QWidget | None = None, expanded: bool = True) -> None:
super().__init__(parent=parent)
self._expanded = expanded
self.setFrameStyle(QFrame.Shape.StyledPanel | QFrame.Shadow.Plain)
self.setSizePolicy(QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Minimum)
self._layout = QVBoxLayout()
self._layout.setContentsMargins(0, 0, 0, 0)
self.setLayout(self._layout)
self._title_layout = QHBoxLayout()
self._layout.addLayout(self._title_layout)
self._expansion_button = QToolButton()
self._update_icon()
self._title = QLabel(f"<b>{title}</b>")
self._title_layout.addWidget(self._expansion_button)
self._title_layout.addWidget(self._title)
self._contents = QWidget(self)
self._layout.addWidget(self._contents)
self._expansion_button.clicked.connect(self.switch_expanded_state)
self.expanded = self._expanded # type: ignore
def set_layout(self, layout: QLayout) -> None:
self._contents.setLayout(layout)
self._contents.layout().setContentsMargins(0, 0, 0, 0) # type: ignore
@SafeSlot()
def switch_expanded_state(self):
self.expanded = not self.expanded # type: ignore
self._update_icon()
@SafeProperty(bool)
def expanded(self): # type: ignore
return self._expanded
@expanded.setter
def expanded(self, expanded: bool):
self._expanded = expanded
self._contents.setVisible(expanded)
self.updateGeometry()
def _update_icon(self):
self._expansion_button.setIcon(
material_icon(icon_name=self.EXPANDED_ICON_NAME, size=(10, 10), convert_to_pixmap=False)
if self.expanded
else material_icon(
icon_name=self.COLLAPSED_ICON_NAME, size=(10, 10), convert_to_pixmap=False
)
)

View File

@@ -1,182 +0,0 @@
from __future__ import annotations
from decimal import Decimal
from types import NoneType
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from pydantic import BaseModel, ValidationError
from qtpy.QtCore import Signal # type: ignore
from qtpy.QtWidgets import QGridLayout, QLabel, QLayout, QVBoxLayout, QWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.compact_popup import CompactPopupWidget
from bec_widgets.utils.forms_from_types.items import FormItemSpec, widget_from_type
logger = bec_logger.logger
class TypedForm(BECWidget, QWidget):
PLUGIN = True
ICON_NAME = "list_alt"
value_changed = Signal()
RPC = False
def __init__(
self,
parent=None,
items: list[tuple[str, type]] | None = None,
form_item_specs: list[FormItemSpec] | None = None,
client=None,
**kwargs,
):
"""Widget with a list of form items based on a list of types.
Args:
items (list[tuple[str, type]]): list of tuples of a name for the field and its type.
Should be a type supported by the logic in items.py
form_item_specs (list[FormItemSpec]): list of form item specs, equivalent to items.
only one of items or form_item_specs should be
supplied.
"""
if (items is not None and form_item_specs is not None) or (
items is None and form_item_specs is None
):
raise ValueError("Must specify one and only one of items and form_item_specs")
super().__init__(parent=parent, client=client, **kwargs)
self._items = (
form_item_specs
if form_item_specs is not None
else [
FormItemSpec(name=name, item_type=item_type)
for name, item_type in items # type: ignore
]
)
self._layout = QVBoxLayout()
self._layout.setContentsMargins(0, 0, 0, 0)
self.setLayout(self._layout)
self._form_grid_container = QWidget(parent=self)
self._form_grid = QWidget(parent=self._form_grid_container)
self._layout.addWidget(self._form_grid_container)
self._form_grid_container.setLayout(QVBoxLayout())
self._form_grid.setLayout(self._new_grid_layout())
self.populate()
def populate(self):
self._clear_grid()
for r, item in enumerate(self._items):
self._add_griditem(item, r)
def _add_griditem(self, item: FormItemSpec, row: int):
grid = self._form_grid.layout()
label = QLabel(item.name)
label.setProperty("_model_field_name", item.name)
label.setToolTip(item.info.description or item.name)
grid.addWidget(label, row, 0)
widget = widget_from_type(item.item_type)(parent=self, spec=item)
widget.valueChanged.connect(self.value_changed)
grid.addWidget(widget, row, 1)
def _dict_from_grid(self) -> dict[str, str | int | float | Decimal | bool]:
grid: QGridLayout = self._form_grid.layout() # type: ignore
return {
grid.itemAtPosition(i, 0)
.widget()
.property("_model_field_name"): grid.itemAtPosition(i, 1)
.widget()
.getValue() # type: ignore # we only add 'DynamicFormItem's here
for i in range(grid.rowCount())
}
def _clear_grid(self):
if (old_layout := self._form_grid.layout()) is not None:
while old_layout.count():
item = old_layout.takeAt(0)
widget = item.widget()
if widget is not None:
widget.deleteLater()
old_layout.deleteLater()
self._form_grid.deleteLater()
self._form_grid = QWidget()
self._form_grid.setLayout(self._new_grid_layout())
self._form_grid_container.layout().addWidget(self._form_grid)
self._form_grid.adjustSize()
self._form_grid_container.adjustSize()
self.adjustSize()
def _new_grid_layout(self):
new_grid = QGridLayout()
new_grid.setContentsMargins(0, 0, 0, 0)
new_grid.setSizeConstraint(QLayout.SizeConstraint.SetFixedSize)
return new_grid
class PydanticModelForm(TypedForm):
metadata_updated = Signal(dict)
metadata_cleared = Signal(NoneType)
def __init__(self, parent=None, metadata_model: type[BaseModel] = None, client=None, **kwargs):
"""
A form generated from a pydantic model.
Args:
metadata_model (type[BaseModel]): the model class for which to generate a form.
"""
self._md_schema = metadata_model
super().__init__(parent=parent, form_item_specs=self._form_item_specs(), client=client)
self._validity = CompactPopupWidget()
self._validity.compact_view = True # type: ignore
self._validity.label = "Metadata validity" # type: ignore
self._validity.compact_show_popup.setIcon(
material_icon(icon_name="info", size=(10, 10), convert_to_pixmap=False)
)
self._validity_message = QLabel("Not yet validated")
self._validity.addWidget(self._validity_message)
self._layout.addWidget(self._validity)
self.value_changed.connect(self.validate_form)
def set_schema(self, schema: type[BaseModel]):
self._md_schema = schema
self.populate()
def _form_item_specs(self):
return [
FormItemSpec(name=name, info=info, item_type=info.annotation)
for name, info in self._md_schema.model_fields.items()
]
def update_items_from_schema(self):
self._items = self._form_item_specs()
def populate(self):
self.update_items_from_schema()
super().populate()
def get_form_data(self):
"""Get the entered metadata as a dict."""
return self._dict_from_grid()
def validate_form(self, *_) -> bool:
"""validate the currently entered metadata against the pydantic schema.
If successful, returns on metadata_emitted and returns true.
Otherwise, emits on metadata_cleared and returns false."""
try:
metadata_dict = self.get_form_data()
self._md_schema.model_validate(metadata_dict)
self._validity.set_global_state("success")
self._validity_message.setText("No errors!")
self.metadata_updated.emit(metadata_dict)
return True
except ValidationError as e:
self._validity.set_global_state("emergency")
self._validity_message.setText(str(e))
self.metadata_cleared.emit(None)
return False

View File

@@ -1,286 +0,0 @@
from __future__ import annotations
from abc import abstractmethod
from decimal import Decimal
from types import UnionType
from typing import Callable, Protocol
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from pydantic import BaseModel, ConfigDict, Field
from pydantic.fields import FieldInfo
from qtpy.QtCore import Signal # type: ignore
from qtpy.QtWidgets import (
QApplication,
QButtonGroup,
QCheckBox,
QDoubleSpinBox,
QGridLayout,
QHBoxLayout,
QLabel,
QLayout,
QLineEdit,
QRadioButton,
QSpinBox,
QToolButton,
QWidget,
)
from bec_widgets.widgets.editors.scan_metadata._util import (
clearable_required,
field_default,
field_limits,
field_maxlen,
field_minlen,
field_precision,
)
logger = bec_logger.logger
class FormItemSpec(BaseModel):
"""
The specification for an item in a dynamically generated form. Uses a pydantic FieldInfo
to store most annotation info, since one of the main purposes is to store data for
forms genrated from pydantic models, but can also be composed from other sources or by hand.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
item_type: type | UnionType
name: str
info: FieldInfo = FieldInfo()
class ClearableBoolEntry(QWidget):
stateChanged = Signal()
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._layout = QHBoxLayout()
self._layout.setContentsMargins(0, 0, 0, 0)
self.setLayout(self._layout)
self._layout.setSizeConstraint(QLayout.SizeConstraint.SetFixedSize)
self._entry = QButtonGroup()
self._true = QRadioButton("true", parent=self)
self._false = QRadioButton("false", parent=self)
for button in [self._true, self._false]:
self._layout.addWidget(button)
self._entry.addButton(button)
button.toggled.connect(self.stateChanged)
def clear(self):
self._entry.setExclusive(False)
self._true.setChecked(False)
self._false.setChecked(False)
self._entry.setExclusive(True)
def isChecked(self) -> bool | None:
if not self._true.isChecked() and not self._false.isChecked():
return None
return self._true.isChecked()
def setChecked(self, value: bool | None):
if value is None:
self.clear()
elif value:
self._true.setChecked(True)
self._false.setChecked(False)
else:
self._true.setChecked(False)
self._false.setChecked(True)
def setToolTip(self, tooltip: str):
self._true.setToolTip(tooltip)
self._false.setToolTip(tooltip)
class DynamicFormItem(QWidget):
valueChanged = Signal()
def __init__(self, parent: QWidget | None = None, *, spec: FormItemSpec) -> None:
super().__init__(parent)
self._spec = spec
self._layout = QHBoxLayout()
self._layout.setContentsMargins(0, 0, 0, 0)
self._layout.setSizeConstraint(QLayout.SizeConstraint.SetMaximumSize)
self._default = field_default(self._spec.info)
self._desc = self._spec.info.description
self.setLayout(self._layout)
self._add_main_widget()
if clearable_required(spec.info):
self._add_clear_button()
@abstractmethod
def getValue(self): ...
@abstractmethod
def setValue(self, value): ...
@abstractmethod
def _add_main_widget(self) -> None:
"""Add the main data entry widget to self._main_widget and appply any
constraints from the field info"""
def _describe(self, pad=" "):
return pad + (self._desc if self._desc else "")
def _add_clear_button(self):
self._clear_button = QToolButton()
self._clear_button.setIcon(
material_icon(icon_name="close", size=(10, 10), convert_to_pixmap=False)
)
self._layout.addWidget(self._clear_button)
# the widget added in _add_main_widget must implement .clear() if value is not required
self._clear_button.setToolTip("Clear value or reset to default.")
self._clear_button.clicked.connect(self._main_widget.clear) # type: ignore
def _value_changed(self, *_, **__):
self.valueChanged.emit()
class StrMetadataField(DynamicFormItem):
def __init__(self, parent: QWidget | None = None, *, spec: FormItemSpec) -> None:
super().__init__(parent=parent, spec=spec)
self._main_widget.textChanged.connect(self._value_changed)
def _add_main_widget(self) -> None:
self._main_widget = QLineEdit()
self._layout.addWidget(self._main_widget)
min_length, max_length = (field_minlen(self._spec.info), field_maxlen(self._spec.info))
if max_length:
self._main_widget.setMaxLength(max_length)
self._main_widget.setToolTip(
f"(length min: {min_length} max: {max_length}){self._describe()}"
)
if self._default:
self._main_widget.setText(self._default)
self._add_clear_button()
def getValue(self):
if self._main_widget.text() == "":
return self._default
return self._main_widget.text()
def setValue(self, value: str):
if value is None:
self._main_widget.setText("")
self._main_widget.setText(value)
class IntMetadataField(DynamicFormItem):
def __init__(self, parent: QWidget | None = None, *, spec: FormItemSpec) -> None:
super().__init__(parent=parent, spec=spec)
self._main_widget.textChanged.connect(self._value_changed)
def _add_main_widget(self) -> None:
self._main_widget = QSpinBox()
self._layout.addWidget(self._main_widget)
min_, max_ = field_limits(self._spec.info, int)
self._main_widget.setMinimum(min_)
self._main_widget.setMaximum(max_)
self._main_widget.setToolTip(f"(range {min_} to {max_}){self._describe()}")
if self._default is not None:
self._main_widget.setValue(self._default)
self._add_clear_button()
else:
self._main_widget.clear()
def getValue(self):
if self._main_widget.text() == "":
return self._default
return self._main_widget.value()
def setValue(self, value: int):
if value is None:
self._main_widget.clear()
self._main_widget.setValue(value)
class FloatDecimalMetadataField(DynamicFormItem):
def __init__(self, parent: QWidget | None = None, *, spec: FormItemSpec) -> None:
super().__init__(parent=parent, spec=spec)
self._main_widget.textChanged.connect(self._value_changed)
def _add_main_widget(self) -> None:
self._main_widget = QDoubleSpinBox()
self._layout.addWidget(self._main_widget)
min_, max_ = field_limits(self._spec.info, int)
self._main_widget.setMinimum(min_)
self._main_widget.setMaximum(max_)
precision = field_precision(self._spec.info)
if precision:
self._main_widget.setDecimals(precision)
minstr = f"{float(min_):.3f}" if abs(min_) <= 1000 else f"{float(min_):.3e}"
maxstr = f"{float(max_):.3f}" if abs(max_) <= 1000 else f"{float(max_):.3e}"
self._main_widget.setToolTip(f"(range {minstr} to {maxstr}){self._describe()}")
if self._default is not None:
self._main_widget.setValue(self._default)
self._add_clear_button()
else:
self._main_widget.clear()
def getValue(self):
if self._main_widget.text() == "":
return self._default
return self._main_widget.value()
def setValue(self, value: float):
if value is None:
self._main_widget.clear()
self._main_widget.setValue(value)
class BoolMetadataField(DynamicFormItem):
def __init__(self, *, parent: QWidget | None = None, spec: FormItemSpec) -> None:
super().__init__(parent=parent, spec=spec)
self._main_widget.stateChanged.connect(self._value_changed)
def _add_main_widget(self) -> None:
if clearable_required(self._spec.info):
self._main_widget = ClearableBoolEntry()
else:
self._main_widget = QCheckBox()
self._layout.addWidget(self._main_widget)
self._main_widget.setToolTip(self._describe(""))
self._main_widget.setChecked(self._default) # type: ignore # if there is no default then it will be ClearableBoolEntry and can be set with None
def getValue(self):
return self._main_widget.isChecked()
def setValue(self, value):
self._main_widget.setChecked(value)
def widget_from_type(annotation: type | UnionType | None) -> type[DynamicFormItem]:
if annotation in [str, str | None]:
return StrMetadataField
if annotation in [int, int | None]:
return IntMetadataField
if annotation in [float, float | None, Decimal, Decimal | None]:
return FloatDecimalMetadataField
if annotation in [bool, bool | None]:
return BoolMetadataField
else:
logger.warning(f"Type {annotation} is not (yet) supported in metadata form creation.")
return StrMetadataField
if __name__ == "__main__": # pragma: no cover
class TestModel(BaseModel):
value1: str | None = Field(None)
value2: bool | None = Field(None)
value3: bool = Field(True)
value4: int = Field(123)
value5: int | None = Field()
app = QApplication([])
w = QWidget()
layout = QGridLayout()
w.setLayout(layout)
for i, (field_name, info) in enumerate(TestModel.model_fields.items()):
layout.addWidget(QLabel(field_name), i, 0)
layout.addWidget(widget_from_type(info.annotation)(info), i, 1)
w.show()
app.exec()

View File

@@ -1,30 +1,17 @@
import inspect
import os
import re
from typing import NamedTuple
from qtpy.QtCore import QObject
from bec_widgets.utils.name_utils import pascal_to_snake
EXCLUDED_PLUGINS = ["BECConnector", "BECDockArea", "BECDock", "BECFigure"]
class PluginFilenames(NamedTuple):
register: str
plugin: str
pyproj: str
def plugin_filenames(name: str) -> PluginFilenames:
return PluginFilenames(f"register_{name}.py", f"{name}_plugin.py", f"{name}.pyproject")
class DesignerPluginInfo:
def __init__(self, plugin_class):
self.plugin_class = plugin_class
self.plugin_name_pascal = plugin_class.__name__
self.plugin_name_snake = pascal_to_snake(self.plugin_name_pascal)
self.plugin_name_snake = self.pascal_to_snake(self.plugin_name_pascal)
self.widget_import = f"from {plugin_class.__module__} import {self.plugin_name_pascal}"
plugin_module = (
".".join(plugin_class.__module__.split(".")[:-1]) + f".{self.plugin_name_snake}_plugin"
@@ -40,6 +27,21 @@ class DesignerPluginInfo:
self.base_path = os.path.dirname(inspect.getfile(plugin_class))
@staticmethod
def pascal_to_snake(name: str) -> str:
"""
Convert PascalCase to snake_case.
Args:
name (str): The name to be converted.
Returns:
str: The converted name.
"""
s1 = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name)
s2 = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", s1)
return s2.lower()
class DesignerPluginGenerator:
def __init__(self, widget: type):
@@ -51,15 +53,11 @@ class DesignerPluginGenerator:
self._excluded = True
return
self.templates: dict[str, str] = {}
self.templates = {}
self.template_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "plugin_templates"
)
@property
def filenames(self):
return plugin_filenames(self.info.plugin_name_snake)
def run(self, validate=True):
if self._excluded:
print(f"Plugin {self.widget.__name__} is excluded from generation.")
@@ -109,33 +107,31 @@ class DesignerPluginGenerator:
or bool(init_source.find("super().__init__(parent)") > 0)
)
# for the new style classes, we only have one super call. We can therefore check if the
# number of __init__ calls is 2 (the class itself and the super class)
num_inits = re.findall(r"__init__", init_source)
if len(num_inits) == 2 and not super_init_found:
super_init_found = bool(
init_source.find("super().__init__(parent=parent") > 0
or init_source.find("super().__init__(parent,") > 0
or init_source.find("super().__init__(parent)") > 0
)
if not cls_init_found and not super_init_found:
raise ValueError(
f"Widget class {self.widget.__name__} must call the super constructor with parent."
)
def _write_file(self, name: str, contents: str):
with open(os.path.join(self.info.base_path, name), "w", encoding="utf-8") as f:
f.write(contents)
def _format(self, name: str):
return self.templates[name].format(**self.info.__dict__)
def _write_templates(self):
self._write_file(self.filenames.register, self._format("register"))
self._write_file(self.filenames.plugin, self._format("plugin"))
pyproj = str({"files": [f"{self.info.plugin_class.__module__.split('.')[-1]}.py"]})
self._write_file(self.filenames.pyproj, pyproj)
self._write_register()
self._write_plugin()
self._write_pyproject()
def _write_register(self):
file_path = os.path.join(self.info.base_path, f"register_{self.info.plugin_name_snake}.py")
with open(file_path, "w", encoding="utf-8") as f:
f.write(self.templates["register"].format(**self.info.__dict__))
def _write_plugin(self):
file_path = os.path.join(self.info.base_path, f"{self.info.plugin_name_snake}_plugin.py")
with open(file_path, "w", encoding="utf-8") as f:
f.write(self.templates["plugin"].format(**self.info.__dict__))
def _write_pyproject(self):
file_path = os.path.join(self.info.base_path, f"{self.info.plugin_name_snake}.pyproject")
out = {"files": [f"{self.info.plugin_class.__module__.split('.')[-1]}.py"]}
with open(file_path, "w", encoding="utf-8") as f:
f.write(str(out))
def _load_templates(self):
for file in os.listdir(self.template_path):
@@ -147,7 +143,7 @@ class DesignerPluginGenerator:
if __name__ == "__main__": # pragma: no cover
# from bec_widgets.widgets.bec_queue.bec_queue import BECQueue
from bec_widgets.widgets.utility.spinner import SpinnerWidget
from bec_widgets.widgets.spinner.spinner import SpinnerWidget
generator = DesignerPluginGenerator(SpinnerWidget)
generator.run(validate=False)

View File

@@ -1,7 +1,5 @@
"""Module for a thin wrapper (LinearRegionWrapper) around the LinearRegionItem in pyqtgraph.
The class is mainly designed for usage with the BECWaveform and 1D plots."""
from __future__ import annotations
""" Module for a thin wrapper (LinearRegionWrapper) around the LinearRegionItem in pyqtgraph.
The class is mainly designed for usage with the BECWaveform and 1D plots. """
import pyqtgraph as pg
from qtpy.QtCore import QObject, Signal, Slot

View File

@@ -1,16 +0,0 @@
import re
def pascal_to_snake(name: str) -> str:
"""
Convert PascalCase to snake_case.
Args:
name (str): The name to be converted.
Returns:
str: The converted name.
"""
s1 = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name)
s2 = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", s1)
return s2.lower()

View File

@@ -148,7 +148,10 @@ class BECTickItem(BECIndicatorItem):
def cleanup(self) -> None:
"""Cleanup the item"""
self.remove_from_plot()
self.tick_item = None
if self.tick_item is not None:
self.tick_item.close()
self.tick_item.deleteLater()
self.tick_item = None
class BECArrowItem(BECIndicatorItem):
@@ -171,7 +174,7 @@ class BECArrowItem(BECIndicatorItem):
def __init__(self, plot_item: pg.PlotItem = None, parent=None):
super().__init__(plot_item=plot_item, parent=parent)
self.arrow_item = pg.ArrowItem()
self.arrow_item = pg.ArrowItem(parent=parent)
self.arrow_item.skip_auto_range = True
self._pos = (0, 0)
self.arrow_item.setVisible(False)

View File

@@ -1,10 +1,7 @@
from __future__ import annotations
import importlib
import inspect
import os
from dataclasses import dataclass
from typing import TYPE_CHECKING
from bec_lib.plugin_helper import _get_available_plugins
from qtpy.QtWidgets import QGraphicsWidget, QWidget
@@ -12,9 +9,6 @@ from qtpy.QtWidgets import QGraphicsWidget, QWidget
from bec_widgets.utils import BECConnector
from bec_widgets.utils.bec_widget import BECWidget
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
def get_plugin_widgets() -> dict[str, BECConnector]:
"""
@@ -51,40 +45,6 @@ def _filter_plugins(obj):
return inspect.isclass(obj) and issubclass(obj, BECConnector)
def get_plugin_auto_updates() -> dict[str, type[AutoUpdates]]:
"""
Get all available auto update classes from the plugin directory. AutoUpdates must inherit from AutoUpdate and be
placed in the plugin repository's bec_widgets/auto_updates directory. The entry point for the auto updates is
specified in the respective pyproject.toml file using the following key:
[project.entry-points."bec.widgets.auto_updates"]
plugin_widgets_update = "<beamline_name>.bec_widgets.auto_updates"
e.g.
[project.entry-points."bec.widgets.auto_updates"]
plugin_widgets_update = "pxiii_bec.bec_widgets.auto_updates"
Returns:
dict[str, AutoUpdates]: A dictionary of widget names and their respective classes.
"""
modules = _get_available_plugins("bec.widgets.auto_updates")
loaded_plugins = {}
for module in modules:
mods = inspect.getmembers(module, predicate=_filter_auto_updates)
for name, mod_cls in mods:
if name in loaded_plugins:
print(f"Duplicated auto update {name}.")
loaded_plugins[name] = mod_cls
return loaded_plugins
def _filter_auto_updates(obj):
from bec_widgets.widgets.containers.auto_update.auto_updates import AutoUpdates
return (
inspect.isclass(obj) and issubclass(obj, AutoUpdates) and not obj.__name__ == "AutoUpdates"
)
@dataclass
class BECClassInfo:
name: str
@@ -93,18 +53,12 @@ class BECClassInfo:
obj: type
is_connector: bool = False
is_widget: bool = False
is_plugin: bool = False
is_top_level: bool = False
class BECClassContainer:
def __init__(self):
self._collection: list[BECClassInfo] = []
def __repr__(self):
return str(list(cl.name for cl in self.collection))
def __iter__(self):
return self._collection.__iter__()
self._collection = []
def add_class(self, class_info: BECClassInfo):
"""
@@ -134,14 +88,14 @@ class BECClassContainer:
"""
Get all top-level classes.
"""
return [info.obj for info in self.collection if info.is_plugin]
return [info.obj for info in self.collection if info.is_top_level]
@property
def plugins(self):
"""
Get all plugins. These are all classes that are on the top level and are widgets.
"""
return [info.obj for info in self.collection if info.is_widget and info.is_plugin]
return [info.obj for info in self.collection if info.is_widget and info.is_top_level]
@property
def widgets(self):
@@ -155,17 +109,10 @@ class BECClassContainer:
"""
Get all top-level classes that are RPC-enabled. These are all classes that users can choose from.
"""
return [info.obj for info in self.collection if info.is_plugin and info.is_connector]
@property
def classes(self):
"""
Get all classes.
"""
return [info.obj for info in self.collection]
return [info.obj for info in self.collection if info.is_top_level and info.is_connector]
def get_custom_classes(repo_name: str) -> BECClassContainer:
def get_rpc_classes(repo_name: str) -> BECClassContainer:
"""
Get all RPC-enabled classes in the specified repository.
@@ -206,8 +153,6 @@ def get_custom_classes(repo_name: str) -> BECClassContainer:
issubclass(obj, QWidget) or issubclass(obj, QGraphicsWidget)
):
class_info.is_top_level = True
if hasattr(obj, "PLUGIN") and obj.PLUGIN:
class_info.is_plugin = True
collection.add_class(class_info)
return collection

View File

@@ -1,157 +0,0 @@
import pyqtgraph as pg
from qtpy.QtCore import Property
from qtpy.QtWidgets import QApplication, QFrame, QHBoxLayout, QVBoxLayout, QWidget
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
class RoundedFrame(QFrame):
"""
A custom QFrame with rounded corners and optional theme updates.
The frame can contain any QWidget, however it is mainly designed to wrap PlotWidgets to provide a consistent look and feel with other BEC Widgets.
"""
def __init__(
self,
parent=None,
content_widget: QWidget = None,
background_color: str = None,
orientation: str = "horizontal",
radius: int = 10,
):
QFrame.__init__(self, parent)
self.background_color = background_color
self._radius = radius
# Apply rounded frame styling
self.setProperty("skip_settings", True)
self.setObjectName("roundedFrame")
# Create a layout for the frame
if orientation == "vertical":
self.layout = QVBoxLayout(self)
self.layout.setContentsMargins(5, 5, 5, 5)
else:
self.layout = QHBoxLayout(self)
self.layout.setContentsMargins(5, 5, 5, 5) # Set 5px margin
# Add the content widget to the layout
if content_widget:
self.layout.addWidget(content_widget)
# Store reference to the content widget
self.content_widget = content_widget
# Automatically apply initial styles to the GraphicalLayoutWidget if applicable
self.apply_plot_widget_style()
def apply_theme(self, theme: str):
"""
Apply the theme to the frame and its content if theme updates are enabled.
"""
if self.content_widget is not None and isinstance(
self.content_widget, pg.GraphicsLayoutWidget
):
self.content_widget.setBackground(self.background_color)
# Update background color based on the theme
if theme == "light":
self.background_color = "#e9ecef" # Subtle contrast for light mode
else:
self.background_color = "#141414" # Dark mode
self.update_style()
@Property(int)
def radius(self):
"""Radius of the rounded corners."""
return self._radius
@radius.setter
def radius(self, value: int):
self._radius = value
self.update_style()
def update_style(self):
"""
Update the style of the frame based on the background color.
"""
if self.background_color:
self.setStyleSheet(
f"""
QFrame#roundedFrame {{
background-color: {self.background_color};
border-radius: {self._radius}; /* Rounded corners */
}}
"""
)
self.apply_plot_widget_style()
def apply_plot_widget_style(self, border: str = "none"):
"""
Automatically apply background, border, and axis styles to the PlotWidget.
Args:
border (str): Border style (e.g., 'none', '1px solid red').
"""
if isinstance(self.content_widget, pg.GraphicsLayoutWidget):
# Apply border style via stylesheet
self.content_widget.setStyleSheet(
f"""
GraphicsLayoutWidget {{
border: {border}; /* Explicitly set the border */
}}
"""
)
self.content_widget.setBackground(self.background_color)
class ExampleApp(QWidget): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Rounded Plots Example")
# Main layout
layout = QVBoxLayout(self)
dark_button = DarkModeButton()
# Create PlotWidgets
plot1 = pg.GraphicsLayoutWidget()
plot_item_1 = pg.PlotItem()
plot_item_1.plot([1, 3, 2, 4, 6, 5], pen="r")
plot1.plot_item = plot_item_1
plot2 = pg.GraphicsLayoutWidget()
plot_item_2 = pg.PlotItem()
plot_item_2.plot([1, 2, 4, 8, 16, 32], pen="r")
plot2.plot_item = plot_item_2
# Wrap PlotWidgets in RoundedFrame
rounded_plot1 = RoundedFrame(parent=self, content_widget=plot1)
rounded_plot2 = RoundedFrame(parent=self, content_widget=plot2)
# Add to layout
layout.addWidget(dark_button)
layout.addWidget(rounded_plot1)
layout.addWidget(rounded_plot2)
self.setLayout(layout)
from qtpy.QtCore import QTimer
def change_theme():
rounded_plot1.apply_theme("light")
rounded_plot2.apply_theme("dark")
QTimer.singleShot(100, change_theme)
if __name__ == "__main__": # pragma: no cover
app = QApplication([])
window = ExampleApp()
window.show()
app.exec()

View File

@@ -1,277 +0,0 @@
from __future__ import annotations
import functools
import traceback
import types
from contextlib import contextmanager
from typing import TYPE_CHECKING, Callable, TypeVar
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import
from qtpy.QtCore import QTimer
from qtpy.QtWidgets import QApplication
from redis.exceptions import RedisError
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.error_popups import ErrorPopupUtility
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
if TYPE_CHECKING: # pragma: no cover
from bec_lib import messages
from qtpy.QtCore import QObject
else:
messages = lazy_import("bec_lib.messages")
logger = bec_logger.logger
T = TypeVar("T")
@contextmanager
def rpc_exception_hook(err_func):
"""This context replaces the popup message box for error display with a specific hook"""
# get error popup utility singleton
popup = ErrorPopupUtility()
# save current setting
old_exception_hook = popup.custom_exception_hook
# install err_func, if it is a callable
# IMPORTANT, Keep self here, because this method is overwriting the custom_exception_hook
# of the ErrorPopupUtility (popup instance) class.
def custom_exception_hook(self, exc_type, value, tb, **kwargs):
err_func({"error": popup.get_error_message(exc_type, value, tb)})
popup.custom_exception_hook = types.MethodType(custom_exception_hook, popup)
try:
yield popup
finally:
# restore state of error popup utility singleton
popup.custom_exception_hook = old_exception_hook
class RPCServer:
client: BECClient
def __init__(
self,
gui_id: str,
dispatcher: BECDispatcher | None = None,
client: BECClient | None = None,
config=None,
gui_class_id: str = "bec",
) -> None:
self.status = messages.BECStatus.BUSY
self.dispatcher = BECDispatcher(config=config) if dispatcher is None else dispatcher
self.client = self.dispatcher.client if client is None else client
self.client.start()
self.gui_id = gui_id
# register broadcast callback
self.rpc_register = RPCRegister()
self.rpc_register.add_callback(self.broadcast_registry_update)
self.dispatcher.connect_slot(
self.on_rpc_update, MessageEndpoints.gui_instructions(self.gui_id)
)
# Setup QTimer for heartbeat
self._heartbeat_timer = QTimer()
self._heartbeat_timer.timeout.connect(self.emit_heartbeat)
self._heartbeat_timer.start(200)
self._registry_update_callbacks = []
self._broadcasted_data = {}
self.status = messages.BECStatus.RUNNING
logger.success(f"Server started with gui_id: {self.gui_id}")
def on_rpc_update(self, msg: dict, metadata: dict):
request_id = metadata.get("request_id")
if request_id is None:
logger.error("Received RPC instruction without request_id")
return
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
try:
obj = self.get_object_from_config(msg["parameter"])
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
res = self.run_rpc(obj, method, args, kwargs)
except Exception:
content = traceback.format_exc()
logger.error(f"Error while executing RPC instruction: {content}")
self.send_response(request_id, False, {"error": content})
else:
logger.debug(f"RPC instruction executed successfully: {res}")
self.send_response(request_id, True, {"result": res})
def send_response(self, request_id: str, accepted: bool, msg: dict):
self.client.connector.set_and_publish(
MessageEndpoints.gui_instruction_response(request_id),
messages.RequestResponseMessage(accepted=accepted, message=msg),
expire=60,
)
def get_object_from_config(self, config: dict):
gui_id = config.get("gui_id")
obj = self.rpc_register.get_rpc_by_id(gui_id)
if obj is None:
raise ValueError(f"Object with gui_id {gui_id} not found")
return obj
def run_rpc(self, obj, method, args, kwargs):
# Run with rpc registry broadcast, but only once
with RPCRegister.delayed_broadcast():
logger.debug(f"Running RPC instruction: {method} with args: {args}, kwargs: {kwargs}")
method_obj = getattr(obj, method)
# check if the method accepts args and kwargs
if not callable(method_obj):
if not args:
res = method_obj
else:
setattr(obj, method, args[0])
res = None
else:
res = method_obj(*args, **kwargs)
if isinstance(res, list):
res = [self.serialize_object(obj) for obj in res]
elif isinstance(res, dict):
res = {key: self.serialize_object(val) for key, val in res.items()}
else:
res = self.serialize_object(res)
return res
def serialize_object(self, obj: T) -> None | dict | T:
"""
Serialize all BECConnector objects.
Args:
obj: The object to be serialized.
Returns:
None | dict | T: The serialized object or None if the object is not a BECConnector.
"""
if not isinstance(obj, BECConnector):
return obj
# Respect RPC = False
if getattr(obj, "RPC", True) is False:
return None
return self._serialize_bec_connector(obj, wait=True)
def emit_heartbeat(self) -> None:
"""
Emit a heartbeat message to the GUI server.
This method is called periodically to indicate that the server is still running.
"""
logger.trace(f"Emitting heartbeat for {self.gui_id}")
try:
self.client.connector.set(
MessageEndpoints.gui_heartbeat(self.gui_id),
messages.StatusMessage(name=self.gui_id, status=self.status, info={}),
expire=10,
)
except RedisError as exc:
logger.error(f"Error while emitting heartbeat: {exc}")
def broadcast_registry_update(self, connections: dict) -> None:
"""
Broadcast the registry update to all the callbacks.
This method is called whenever the registry is updated.
"""
data = {}
for key, val in connections.items():
if not isinstance(val, BECConnector):
continue
if not getattr(val, "RPC", True):
continue
data[key] = self._serialize_bec_connector(val)
if self._broadcasted_data == data:
return
self._broadcasted_data = data
logger.info(f"Broadcasting registry update: {data} for {self.gui_id}")
self.client.connector.xadd(
MessageEndpoints.gui_registry_state(self.gui_id),
msg_dict={"data": messages.GUIRegistryStateMessage(state=data)},
max_size=1,
)
def _serialize_bec_connector(self, connector: BECConnector, wait=False) -> dict:
"""
Create the serialization dict for a single BECConnector.
Args:
connector (BECConnector): The BECConnector to serialize.
wait (bool): If True, wait until the object is registered in the RPC register.
Returns:
dict: The serialized BECConnector object.
"""
config_dict = connector.config.model_dump()
config_dict["parent_id"] = getattr(connector, "parent_id", None)
try:
parent = connector.parent()
if isinstance(parent, BECMainWindow):
container_proxy = parent.gui_id
else:
container_proxy = None
except Exception:
container_proxy = None
if wait:
while not self.rpc_register.object_is_registered(connector):
QApplication.processEvents()
widget_class = getattr(connector, "rpc_widget_class", None)
if not widget_class:
widget_class = connector.__class__.__name__
return {
"gui_id": connector.gui_id,
"object_name": connector.object_name or connector.__class__.__name__,
"widget_class": widget_class,
"config": config_dict,
"container_proxy": container_proxy,
"__rpc__": True,
}
@staticmethod
def _get_becwidget_ancestor(widget: QObject) -> BECConnector | None:
"""
Traverse up the parent chain to find the nearest BECConnector.
Returns None if none is found.
"""
parent = widget.parent()
while parent is not None:
if isinstance(parent, BECConnector):
return parent
parent = parent.parent()
return None
# Suppose clients register callbacks to receive updates
def add_registry_update_callback(self, cb: Callable) -> None:
"""
Add a callback to be called whenever the registry is updated.
The specified callback is called whenever the registry is updated.
Args:
cb (Callable): The callback to be added. It should accept a dictionary of all the
registered RPC objects as an argument.
"""
self._registry_update_callbacks.append(cb)
def shutdown(self): # TODO not sure if needed when cleanup is done at level of BECConnector
self.status = messages.BECStatus.IDLE
self._heartbeat_timer.stop()
self.emit_heartbeat()
logger.info("Succeded in shutting down CLI server")
self.client.shutdown()

View File

@@ -1,44 +0,0 @@
from bec_lib.serialization import msgpack
from qtpy.QtCore import QPointF
def register_serializer_extension():
"""
Register the serializer extension for the BECConnector.
"""
if not module_is_registered("bec_widgets.utils.serialization"):
msgpack.register_object_hook(encode_qpointf, decode_qpointf)
def module_is_registered(module_name: str) -> bool:
"""
Check if the module is registered in the encoder.
Args:
module_name (str): The name of the module to check.
Returns:
bool: True if the module is registered, False otherwise.
"""
# pylint: disable=protected-access
for enc in msgpack._encoder:
if enc[0].__module__ == module_name:
return True
return False
def encode_qpointf(obj):
"""
Encode a QPointF object to a list of floats. As this is mostly used for sending
data to the client, it is not necessary to convert it back to a QPointF object.
"""
if isinstance(obj, QPointF):
return [obj.x(), obj.y()]
return obj
def decode_qpointf(obj):
"""
no-op function since QPointF is encoded as a list of floats.
"""
return obj

View File

@@ -1,376 +0,0 @@
import sys
from typing import Literal, Optional
from qtpy.QtCore import Property, QEasingCurve, QPropertyAnimation
from qtpy.QtGui import QAction
from qtpy.QtWidgets import (
QApplication,
QFrame,
QHBoxLayout,
QLabel,
QMainWindow,
QScrollArea,
QSizePolicy,
QStackedWidget,
QVBoxLayout,
QWidget,
)
from bec_widgets.utils.toolbar import MaterialIconAction, ModularToolBar
class SidePanel(QWidget):
"""
Side panel widget that can be placed on the left, right, top, or bottom of the main widget.
"""
def __init__(
self,
parent=None,
orientation: Literal["left", "right", "top", "bottom"] = "left",
panel_max_width: int = 200,
animation_duration: int = 200,
animations_enabled: bool = True,
):
super().__init__(parent=parent)
self.setProperty("skip_settings", True)
self._orientation = orientation
self._panel_max_width = panel_max_width
self._animation_duration = animation_duration
self._animations_enabled = animations_enabled
self._panel_width = 0
self._panel_height = 0
self.panel_visible = False
self.current_action: Optional[QAction] = None
self.current_index: Optional[int] = None
self.switching_actions = False
self._init_ui()
def _init_ui(self):
"""
Initialize the UI elements.
"""
if self._orientation in ("left", "right"):
self.main_layout = QHBoxLayout(self)
self.main_layout.setContentsMargins(0, 0, 0, 0)
self.main_layout.setSpacing(0)
self.toolbar = ModularToolBar(parent=self, target_widget=self, orientation="vertical")
self.container = QWidget()
self.container.layout = QVBoxLayout(self.container)
self.container.layout.setContentsMargins(0, 0, 0, 0)
self.container.layout.setSpacing(0)
self.stack_widget = QStackedWidget()
self.stack_widget.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
self.stack_widget.setMinimumWidth(5)
self.stack_widget.setMaximumWidth(self._panel_max_width)
if self._orientation == "left":
self.main_layout.addWidget(self.toolbar)
self.main_layout.addWidget(self.container)
else:
self.main_layout.addWidget(self.container)
self.main_layout.addWidget(self.toolbar)
self.container.layout.addWidget(self.stack_widget)
self.menu_anim = QPropertyAnimation(self, b"panel_width")
self.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
self.panel_width = 0 # start hidden
else:
self.main_layout = QVBoxLayout(self)
self.main_layout.setContentsMargins(0, 0, 0, 0)
self.main_layout.setSpacing(0)
self.toolbar = ModularToolBar(parent=self, target_widget=self, orientation="horizontal")
self.container = QWidget()
self.container.layout = QVBoxLayout(self.container)
self.container.layout.setContentsMargins(0, 0, 0, 0)
self.container.layout.setSpacing(0)
self.stack_widget = QStackedWidget()
self.stack_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
self.stack_widget.setMinimumHeight(5)
self.stack_widget.setMaximumHeight(self._panel_max_width)
if self._orientation == "top":
self.main_layout.addWidget(self.toolbar)
self.main_layout.addWidget(self.container)
else:
self.main_layout.addWidget(self.container)
self.main_layout.addWidget(self.toolbar)
self.container.layout.addWidget(self.stack_widget)
self.menu_anim = QPropertyAnimation(self, b"panel_height")
self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
self.panel_height = 0 # start hidden
self.menu_anim.setDuration(self._animation_duration)
self.menu_anim.setEasingCurve(QEasingCurve.InOutQuad)
@Property(int)
def panel_width(self):
"""Get the panel width."""
return self._panel_width
@panel_width.setter
def panel_width(self, width: int):
"""Set the panel width."""
self._panel_width = width
if self._orientation in ("left", "right"):
self.stack_widget.setFixedWidth(width)
@Property(int)
def panel_height(self):
"""Get the panel height."""
return self._panel_height
@panel_height.setter
def panel_height(self, height: int):
"""Set the panel height."""
self._panel_height = height
if self._orientation in ("top", "bottom"):
self.stack_widget.setFixedHeight(height)
@Property(int)
def panel_max_width(self):
"""Get the maximum width of the panel."""
return self._panel_max_width
@panel_max_width.setter
def panel_max_width(self, size: int):
"""Set the maximum width of the panel."""
self._panel_max_width = size
if self._orientation in ("left", "right"):
self.stack_widget.setMaximumWidth(self._panel_max_width)
else:
self.stack_widget.setMaximumHeight(self._panel_max_width)
@Property(int)
def animation_duration(self):
"""Get the duration of the animation."""
return self._animation_duration
@animation_duration.setter
def animation_duration(self, duration: int):
"""Set the duration of the animation."""
self._animation_duration = duration
self.menu_anim.setDuration(duration)
@Property(bool)
def animations_enabled(self):
"""Get the status of the animations."""
return self._animations_enabled
@animations_enabled.setter
def animations_enabled(self, enabled: bool):
"""Set the status of the animations."""
self._animations_enabled = enabled
def show_panel(self, idx: int):
"""
Show the side panel with animation and switch to idx.
"""
self.stack_widget.setCurrentIndex(idx)
self.panel_visible = True
self.current_index = idx
if self._orientation in ("left", "right"):
start_val, end_val = 0, self._panel_max_width
else:
start_val, end_val = 0, self._panel_max_width
if self._animations_enabled:
self.menu_anim.stop()
self.menu_anim.setStartValue(start_val)
self.menu_anim.setEndValue(end_val)
self.menu_anim.start()
else:
if self._orientation in ("left", "right"):
self.panel_width = end_val
else:
self.panel_height = end_val
def hide_panel(self):
"""
Hide the side panel with animation.
"""
self.panel_visible = False
self.current_index = None
if self._orientation in ("left", "right"):
start_val, end_val = self._panel_max_width, 0
else:
start_val, end_val = self._panel_max_width, 0
if self._animations_enabled:
self.menu_anim.stop()
self.menu_anim.setStartValue(start_val)
self.menu_anim.setEndValue(end_val)
self.menu_anim.start()
else:
if self._orientation in ("left", "right"):
self.panel_width = end_val
else:
self.panel_height = end_val
def switch_to(self, idx: int):
"""
Switch to the specified index without animation.
"""
if self.current_index != idx:
self.stack_widget.setCurrentIndex(idx)
self.current_index = idx
def add_menu(
self,
action_id: str,
icon_name: str,
tooltip: str,
widget: QWidget,
title: str | None = None,
):
"""
Add a menu to the side panel.
Args:
action_id(str): The ID of the action.
icon_name(str): The name of the icon.
tooltip(str): The tooltip for the action.
widget(QWidget): The widget to add to the panel.
title(str): The title of the panel.
"""
# container_widget: top-level container for the stacked page
container_widget = QWidget()
container_layout = QVBoxLayout(container_widget)
container_layout.setContentsMargins(0, 0, 0, 0)
container_layout.setSpacing(5)
if title is not None:
title_label = QLabel(f"<b>{title}</b>")
title_label.setStyleSheet("font-size: 16px;")
container_layout.addWidget(title_label)
# Create a QScrollArea for the actual widget to ensure scrolling if the widget inside is too large
scroll_area = QScrollArea()
scroll_area.setFrameShape(QFrame.NoFrame)
scroll_area.setWidgetResizable(True)
# Let the scroll area expand in both directions if there's room
scroll_area.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
scroll_area.setWidget(widget)
# Put the scroll area in the container layout
container_layout.addWidget(scroll_area)
# Optionally stretch the scroll area to fill vertical space
container_layout.setStretchFactor(scroll_area, 1)
# Add container_widget to the stacked widget
index = self.stack_widget.count()
self.stack_widget.addWidget(container_widget)
# Add an action to the toolbar
action = MaterialIconAction(icon_name=icon_name, tooltip=tooltip, checkable=True)
self.toolbar.add_action(action_id, action, target_widget=self)
def on_action_toggled(checked: bool):
if self.switching_actions:
return
if checked:
if self.current_action and self.current_action != action.action:
self.switching_actions = True
self.current_action.setChecked(False)
self.switching_actions = False
self.current_action = action.action
if not self.panel_visible:
self.show_panel(index)
else:
self.switch_to(index)
else:
if self.current_action == action.action:
self.current_action = None
self.hide_panel()
action.action.toggled.connect(on_action_toggled)
############################################
# DEMO APPLICATION
############################################
class ExampleApp(QMainWindow): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Side Panel Example")
central_widget = QWidget()
self.setCentralWidget(central_widget)
self.layout = QHBoxLayout(central_widget)
# Create side panel
self.side_panel = SidePanel(self, orientation="left", panel_max_width=250)
self.layout.addWidget(self.side_panel)
from bec_widgets.widgets.plots.waveform.waveform import Waveform
self.plot = Waveform()
self.layout.addWidget(self.plot)
self.add_side_menus()
def add_side_menus(self):
widget1 = QWidget()
layout1 = QVBoxLayout(widget1)
for i in range(15):
layout1.addWidget(QLabel(f"Widget 1 label row {i}"))
self.side_panel.add_menu(
action_id="widget1",
icon_name="counter_1",
tooltip="Show Widget 1",
widget=widget1,
title="Widget 1 Panel",
)
widget2 = QWidget()
layout2 = QVBoxLayout(widget2)
layout2.addWidget(QLabel("Short widget 2 content"))
self.side_panel.add_menu(
action_id="widget2",
icon_name="counter_2",
tooltip="Show Widget 2",
widget=widget2,
title="Widget 2 Panel",
)
widget3 = QWidget()
layout3 = QVBoxLayout(widget3)
for i in range(10):
layout3.addWidget(QLabel(f"Line {i} for Widget 3"))
self.side_panel.add_menu(
action_id="widget3",
icon_name="counter_3",
tooltip="Show Widget 3",
widget=widget3,
title="Widget 3 Panel",
)
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
window = ExampleApp()
window.resize(1000, 700)
window.show()
sys.exit(app.exec())

File diff suppressed because it is too large Load Diff

View File

@@ -1,17 +1,16 @@
from bec_lib.logger import bec_logger
from qtpy import PYQT6, PYSIDE6
import os
from qtpy import PYQT6, PYSIDE6, QT_VERSION
from qtpy.QtCore import QFile, QIODevice
from bec_widgets.utils.generate_designer_plugin import DesignerPluginInfo
from bec_widgets.utils.plugin_utils import get_custom_classes
logger = bec_logger.logger
from bec_widgets.utils.plugin_utils import get_rpc_classes
if PYSIDE6:
from PySide6.QtUiTools import QUiLoader
class CustomUiLoader(QUiLoader):
def __init__(self, baseinstance, custom_widgets: dict | None = None):
def __init__(self, baseinstance, custom_widgets: dict = None):
super().__init__(baseinstance)
self.custom_widgets = custom_widgets or {}
@@ -19,9 +18,10 @@ if PYSIDE6:
def createWidget(self, class_name, parent=None, name=""):
if class_name in self.custom_widgets:
widget = self.custom_widgets[class_name](self.baseinstance)
widget = self.custom_widgets[class_name](parent)
widget.setObjectName(name)
return widget
return super().createWidget(class_name, self.baseinstance, name)
return super().createWidget(class_name, parent, name)
class UILoader:
@@ -30,7 +30,7 @@ class UILoader:
def __init__(self, parent=None):
self.parent = parent
widgets = get_custom_classes("bec_widgets").classes
widgets = get_rpc_classes("bec_widgets").top_level_classes
self.custom_widgets = {widget.__name__: widget for widget in widgets}
@@ -51,7 +51,7 @@ class UILoader:
Returns:
QWidget: The loaded widget.
"""
parent = parent or self.parent
loader = CustomUiLoader(parent, self.custom_widgets)
file = QFile(ui_file)
if not file.open(QIODevice.ReadOnly):

View File

@@ -1,9 +1,7 @@
# pylint: disable=no-name-in-module
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Literal
import shiboken6 as shb
from qtpy.QtWidgets import (
QApplication,
QCheckBox,
@@ -11,7 +9,6 @@ from qtpy.QtWidgets import (
QDoubleSpinBox,
QLabel,
QLineEdit,
QSlider,
QSpinBox,
QTableWidget,
QTableWidgetItem,
@@ -19,8 +16,6 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
class WidgetHandler(ABC):
"""Abstract base class for all widget handlers."""
@@ -33,15 +28,6 @@ class WidgetHandler(ABC):
def set_value(self, widget: QWidget, value):
"""Set a value on the widget instance."""
def connect_change_signal(self, widget: QWidget, slot):
"""
Connect a change signal from this widget to the given slot.
If the widget type doesn't have a known "value changed" signal, do nothing.
slot: a function accepting two arguments (widget, value)
"""
pass
class LineEditHandler(WidgetHandler):
"""Handler for QLineEdit widgets."""
@@ -52,9 +38,6 @@ class LineEditHandler(WidgetHandler):
def set_value(self, widget: QLineEdit, value: str) -> None:
widget.setText(value)
def connect_change_signal(self, widget: QLineEdit, slot):
widget.textChanged.connect(lambda text, w=widget: slot(w, text))
class ComboBoxHandler(WidgetHandler):
"""Handler for QComboBox widgets."""
@@ -70,11 +53,6 @@ class ComboBoxHandler(WidgetHandler):
if isinstance(value, int):
widget.setCurrentIndex(value)
def connect_change_signal(self, widget: QComboBox, slot):
# currentIndexChanged(int) or currentIndexChanged(str) both possible.
# We use currentIndexChanged(int) for a consistent behavior.
widget.currentIndexChanged.connect(lambda idx, w=widget: slot(w, self.get_value(w)))
class TableWidgetHandler(WidgetHandler):
"""Handler for QTableWidget widgets."""
@@ -94,84 +72,39 @@ class TableWidgetHandler(WidgetHandler):
item = QTableWidgetItem(str(cell_value))
widget.setItem(row, col, item)
def connect_change_signal(self, widget: QTableWidget, slot):
# If desired, we could connect cellChanged(row, col) and then fetch all data.
# This might be noisy if table is large.
# For demonstration, connect cellChanged to update entire table value.
def on_cell_changed(row, col, w=widget):
val = self.get_value(w)
slot(w, val)
widget.cellChanged.connect(on_cell_changed)
class SpinBoxHandler(WidgetHandler):
"""Handler for QSpinBox and QDoubleSpinBox widgets."""
def get_value(self, widget: QSpinBox | QDoubleSpinBox, **kwargs):
def get_value(self, widget, **kwargs):
return widget.value()
def set_value(self, widget: QSpinBox | QDoubleSpinBox, value):
def set_value(self, widget, value):
widget.setValue(value)
def connect_change_signal(self, widget: QSpinBox | QDoubleSpinBox, slot):
widget.valueChanged.connect(lambda val, w=widget: slot(w, val))
class CheckBoxHandler(WidgetHandler):
"""Handler for QCheckBox widgets."""
def get_value(self, widget: QCheckBox, **kwargs):
def get_value(self, widget, **kwargs):
return widget.isChecked()
def set_value(self, widget: QCheckBox, value):
def set_value(self, widget, value):
widget.setChecked(value)
def connect_change_signal(self, widget: QCheckBox, slot):
widget.toggled.connect(lambda val, w=widget: slot(w, val))
class SlideHandler(WidgetHandler):
"""Handler for QCheckBox widgets."""
def get_value(self, widget: QSlider, **kwargs):
return widget.value()
def set_value(self, widget: QSlider, value):
widget.setValue(value)
def connect_change_signal(self, widget: QSlider, slot):
widget.valueChanged.connect(lambda val, w=widget: slot(w, val))
class ToggleSwitchHandler(WidgetHandler):
"""Handler for ToggleSwitch widgets."""
def get_value(self, widget: ToggleSwitch, **kwargs):
return widget.checked
def set_value(self, widget: ToggleSwitch, value):
widget.checked = value
def connect_change_signal(self, widget: ToggleSwitch, slot):
widget.enabled.connect(lambda val, w=widget: slot(w, val))
class LabelHandler(WidgetHandler):
"""Handler for QLabel widgets."""
def get_value(self, widget: QLabel, **kwargs):
def get_value(self, widget, **kwargs):
return widget.text()
def set_value(self, widget: QLabel, value):
def set_value(self, widget, value):
widget.setText(value)
# QLabel typically doesn't have user-editable changes. No signal to connect.
# If needed, this can remain empty.
class WidgetIO:
"""Public interface for getting, setting values and connecting signals using handler mapping"""
"""Public interface for getting and setting values using handler mapping"""
_handlers = {
QLineEdit: LineEditHandler,
@@ -181,8 +114,6 @@ class WidgetIO:
QDoubleSpinBox: SpinBoxHandler,
QCheckBox: CheckBoxHandler,
QLabel: LabelHandler,
ToggleSwitch: ToggleSwitchHandler,
QSlider: SlideHandler,
}
@staticmethod
@@ -217,17 +148,6 @@ class WidgetIO:
elif not ignore_errors:
raise ValueError(f"No handler for widget type: {type(widget)}")
@staticmethod
def connect_widget_change_signal(widget, slot):
"""
Connect the widget's value-changed signal to a generic slot function (widget, value).
This now delegates the logic to the widget's handler.
"""
handler_class = WidgetIO._find_handler(widget)
if handler_class:
handler = handler_class()
handler.connect_change_signal(widget, slot)
@staticmethod
def check_and_adjust_limits(spin_box: QDoubleSpinBox, number: float):
"""
@@ -276,162 +196,39 @@ class WidgetHierarchy:
grab_values: bool = False,
prefix: str = "",
exclude_internal_widgets: bool = True,
only_bec_widgets: bool = False,
show_parent: bool = True,
) -> None:
"""
Print the widget hierarchy to the console.
Args:
widget: Widget to print the hierarchy of.
widget: Widget to print the hierarchy of
indent(int, optional): Level of indentation.
grab_values(bool,optional): Whether to grab the values of the widgets.
prefix(str,optional): Custom string prefix for indentation.
prefix(stc,optional): Custom string prefix for indentation.
exclude_internal_widgets(bool,optional): Whether to exclude internal widgets (e.g. QComboBox in PyQt6).
only_bec_widgets(bool, optional): Whether to print only widgets that are instances of BECWidget.
show_parent(bool, optional): Whether to display which BECWidget is the parent of each discovered BECWidget.
"""
from bec_widgets.utils import BECConnector
from bec_widgets.widgets.plots.waveform.waveform import Waveform
widget_info = f"{widget.__class__.__name__} ({widget.objectName()})"
if grab_values:
value = WidgetIO.get_value(widget, ignore_errors=True)
value_str = f" [value: {value}]" if value is not None else ""
widget_info += value_str
# 1) Filter out widgets that are not BECConnectors (if 'only_bec_widgets' is True)
is_bec = isinstance(widget, BECConnector)
if only_bec_widgets and not is_bec:
return
# 2) Determine and print the parent's info (closest BECConnector)
parent_info = ""
if show_parent and is_bec:
ancestor = WidgetHierarchy._get_becwidget_ancestor(widget)
if ancestor:
parent_label = ancestor.objectName() or ancestor.__class__.__name__
parent_info = f" parent={parent_label}"
else:
parent_info = " parent=None"
widget_info = f"{widget.__class__.__name__} ({widget.objectName()}){parent_info}"
print(prefix + widget_info)
# 3) If it's a Waveform, explicitly print the curves
if isinstance(widget, Waveform):
for curve in widget.curves:
curve_prefix = prefix + " └─ "
print(
f"{curve_prefix}{curve.__class__.__name__} ({curve.objectName()}) "
f"parent={widget.objectName()}"
)
# 4) Recursively handle each child if:
# - It's a QWidget
# - It is a BECConnector (or we don't care about filtering)
# - Its closest BECConnector parent is the current widget
for child in widget.findChildren(QWidget):
if only_bec_widgets and not isinstance(child, BECConnector):
children = widget.children()
for child in children:
if (
exclude_internal_widgets
and isinstance(widget, QComboBox)
and child.__class__.__name__ in ["QFrame", "QBoxLayout", "QListView"]
):
continue
# if WidgetHierarchy._get_becwidget_ancestor(child) == widget:
child_prefix = prefix + " └─ "
child_prefix = prefix + " "
arrow = "├─ " if child != children[-1] else "└─ "
WidgetHierarchy.print_widget_hierarchy(
child,
indent=indent + 1,
grab_values=grab_values,
prefix=child_prefix,
exclude_internal_widgets=exclude_internal_widgets,
only_bec_widgets=only_bec_widgets,
show_parent=show_parent,
child, indent + 1, grab_values, prefix=child_prefix + arrow
)
@staticmethod
def print_becconnector_hierarchy_from_app():
"""
Enumerate ALL BECConnector objects in the QApplication.
Also detect if a widget is a PlotBase, and add any data items
(PlotDataItem-like) that are also BECConnector objects.
Build a parent->children graph where each child's 'parent'
is its closest BECConnector ancestor. Print the entire hierarchy
from the root(s).
The result is a single, consolidated tree for your entire
running GUI, including PlotBase data items that are BECConnector.
"""
import sys
from collections import defaultdict
from qtpy.QtWidgets import QApplication
from bec_widgets.utils import BECConnector
from bec_widgets.widgets.plots.plot_base import PlotBase
# 1) Gather ALL QWidget-based BECConnector objects
all_qwidgets = QApplication.allWidgets()
bec_widgets = set(w for w in all_qwidgets if isinstance(w, BECConnector))
# 2) Also gather any BECConnector-based data items from PlotBase widgets
for w in all_qwidgets:
if isinstance(w, PlotBase) and hasattr(w, "plot_item"):
plot_item = w.plot_item
if hasattr(plot_item, "listDataItems"):
for data_item in plot_item.listDataItems():
if isinstance(data_item, BECConnector):
bec_widgets.add(data_item)
# 3) Build a map of (closest BECConnector parent) -> list of children
parent_map = defaultdict(list)
for w in bec_widgets:
parent_bec = WidgetHierarchy._get_becwidget_ancestor(w)
parent_map[parent_bec].append(w)
# 4) Define a recursive printer to show each object's children
def print_tree(parent, prefix=""):
children = parent_map[parent]
for i, child in enumerate(children):
connector_class = child.__class__.__name__
connector_name = child.objectName() or connector_class
if parent is None:
parent_label = "None"
else:
parent_label = parent.objectName() or parent.__class__.__name__
line = f"{connector_class} ({connector_name}) parent={parent_label}"
# Determine tree-branch symbols
is_last = i == len(children) - 1
branch_str = "└─ " if is_last else "├─ "
print(prefix + branch_str + line)
# Recurse deeper
next_prefix = prefix + (" " if is_last else "")
print_tree(child, prefix=next_prefix)
# 5) Print top-level items (roots) whose BECConnector parent is None
roots = parent_map[None]
for r_i, root in enumerate(roots):
root_class = root.__class__.__name__
root_name = root.objectName() or root_class
line = f"{root_class} ({root_name}) parent=None"
is_last_root = r_i == len(roots) - 1
print(line)
# Recurse into its children
print_tree(root, prefix=" ")
@staticmethod
def _get_becwidget_ancestor(widget):
"""
Traverse up the parent chain to find the nearest BECConnector.
Returns None if none is found.
"""
from bec_widgets.utils import BECConnector
if not shb.isValid(widget):
return None
parent = widget.parent()
while parent is not None:
if isinstance(parent, BECConnector):
return parent
parent = parent.parent()
return None
@staticmethod
def export_config_to_dict(
widget: QWidget,
@@ -512,8 +309,8 @@ class WidgetHierarchy:
WidgetHierarchy.import_config_from_dict(child, widget_config, set_values)
# Example usage
def hierarchy_example(): # pragma: no cover
# Example application to demonstrate the usage of the functions
if __name__ == "__main__": # pragma: no cover
app = QApplication([])
# Create instance of WidgetHierarchy
@@ -568,37 +365,3 @@ def hierarchy_example(): # pragma: no cover
print(f"Config dict new REDUCED: {config_dict_new_reduced}")
app.exec()
def widget_io_signal_example(): # pragma: no cover
app = QApplication([])
main_widget = QWidget()
layout = QVBoxLayout(main_widget)
line_edit = QLineEdit(main_widget)
combo_box = QComboBox(main_widget)
spin_box = QSpinBox(main_widget)
combo_box.addItems(["Option 1", "Option 2", "Option 3"])
layout.addWidget(line_edit)
layout.addWidget(combo_box)
layout.addWidget(spin_box)
main_widget.show()
def universal_slot(w, val):
print(f"Widget {w.objectName() or w} changed, new value: {val}")
# Connect all supported widgets through their handlers
WidgetIO.connect_widget_change_signal(line_edit, universal_slot)
WidgetIO.connect_widget_change_signal(combo_box, universal_slot)
WidgetIO.connect_widget_change_signal(spin_box, universal_slot)
app.exec_()
if __name__ == "__main__": # pragma: no cover
# Change example function to test different scenarios
# hierarchy_example()
widget_io_signal_example()

View File

@@ -1,223 +0,0 @@
from __future__ import annotations
from bec_lib import bec_logger
from qtpy.QtCore import QSettings
from qtpy.QtWidgets import (
QApplication,
QCheckBox,
QFileDialog,
QHBoxLayout,
QLabel,
QLineEdit,
QPushButton,
QSpinBox,
QVBoxLayout,
QWidget,
)
logger = bec_logger.logger
class WidgetStateManager:
"""
A class to manage the state of a widget by saving and loading the state to and from a INI file.
Args:
widget(QWidget): The widget to manage the state for.
"""
def __init__(self, widget):
self.widget = widget
def save_state(self, filename: str = None):
"""
Save the state of the widget to an INI file.
Args:
filename(str): The filename to save the state to.
"""
if not filename:
filename, _ = QFileDialog.getSaveFileName(
self.widget, "Save Settings", "", "INI Files (*.ini)"
)
if filename:
settings = QSettings(filename, QSettings.IniFormat)
self._save_widget_state_qsettings(self.widget, settings)
def load_state(self, filename: str = None):
"""
Load the state of the widget from an INI file.
Args:
filename(str): The filename to load the state from.
"""
if not filename:
filename, _ = QFileDialog.getOpenFileName(
self.widget, "Load Settings", "", "INI Files (*.ini)"
)
if filename:
settings = QSettings(filename, QSettings.IniFormat)
self._load_widget_state_qsettings(self.widget, settings)
def _save_widget_state_qsettings(self, widget: QWidget, settings: QSettings):
"""
Save the state of the widget to QSettings.
Args:
widget(QWidget): The widget to save the state for.
settings(QSettings): The QSettings object to save the state to.
"""
if widget.property("skip_settings") is True:
return
meta = widget.metaObject()
widget_name = self._get_full_widget_name(widget)
settings.beginGroup(widget_name)
for i in range(meta.propertyCount()):
prop = meta.property(i)
name = prop.name()
if (
name == "objectName"
or not prop.isReadable()
or not prop.isWritable()
or not prop.isStored() # can be extended to fine filter
):
continue
value = widget.property(name)
settings.setValue(name, value)
settings.endGroup()
# Recursively process children (only if they aren't skipped)
for child in widget.children():
if (
child.objectName()
and child.property("skip_settings") is not True
and not isinstance(child, QLabel)
):
self._save_widget_state_qsettings(child, settings)
def _load_widget_state_qsettings(self, widget: QWidget, settings: QSettings):
"""
Load the state of the widget from QSettings.
Args:
widget(QWidget): The widget to load the state for.
settings(QSettings): The QSettings object to load the state from.
"""
if widget.property("skip_settings") is True:
return
meta = widget.metaObject()
widget_name = self._get_full_widget_name(widget)
settings.beginGroup(widget_name)
for i in range(meta.propertyCount()):
prop = meta.property(i)
name = prop.name()
if settings.contains(name):
value = settings.value(name)
widget.setProperty(name, value)
settings.endGroup()
# Recursively process children (only if they aren't skipped)
for child in widget.children():
if (
child.objectName()
and child.property("skip_settings") is not True
and not isinstance(child, QLabel)
):
self._load_widget_state_qsettings(child, settings)
def _get_full_widget_name(self, widget: QWidget):
"""
Get the full name of the widget including its parent names.
Args:
widget(QWidget): The widget to get the full name for.
Returns:
str: The full name of the widget.
"""
name = widget.objectName()
parent = widget.parent()
while parent:
obj_name = parent.objectName() or parent.metaObject().className()
name = obj_name + "." + name
parent = parent.parent()
return name
class ExampleApp(QWidget): # pragma: no cover:
def __init__(self):
super().__init__()
self.setObjectName("MainWindow")
self.setWindowTitle("State Manager Example")
layout = QVBoxLayout(self)
# A line edit to store some user text
self.line_edit = QLineEdit(self)
self.line_edit.setObjectName("MyLineEdit")
self.line_edit.setPlaceholderText("Enter some text here...")
layout.addWidget(self.line_edit)
# A spin box to hold a numeric value
self.spin_box = QSpinBox(self)
self.spin_box.setObjectName("MySpinBox")
self.spin_box.setRange(0, 100)
layout.addWidget(self.spin_box)
# A checkbox to hold a boolean value
self.check_box = QCheckBox("Enable feature?", self)
self.check_box.setObjectName("MyCheckBox")
layout.addWidget(self.check_box)
# A checkbox that we want to skip
self.check_box_skip = QCheckBox("Enable feature - skip save?", self)
self.check_box_skip.setProperty("skip_state", True)
self.check_box_skip.setObjectName("MyCheckBoxSkip")
layout.addWidget(self.check_box_skip)
# CREATE A "SIDE PANEL" with nested structure and skip all what is inside
self.side_panel = QWidget(self)
self.side_panel.setObjectName("SidePanel")
self.side_panel.setProperty("skip_settings", True) # skip the ENTIRE panel
layout.addWidget(self.side_panel)
# Put some sub-widgets inside side_panel
panel_layout = QVBoxLayout(self.side_panel)
self.panel_label = QLabel("Label in side panel", self.side_panel)
self.panel_label.setObjectName("PanelLabel")
panel_layout.addWidget(self.panel_label)
self.panel_edit = QLineEdit(self.side_panel)
self.panel_edit.setObjectName("PanelLineEdit")
self.panel_edit.setPlaceholderText("I am inside side panel")
panel_layout.addWidget(self.panel_edit)
self.panel_checkbox = QCheckBox("Enable feature in side panel?", self.side_panel)
self.panel_checkbox.setObjectName("PanelCheckBox")
panel_layout.addWidget(self.panel_checkbox)
# Save/Load buttons
button_layout = QHBoxLayout()
self.save_button = QPushButton("Save State", self)
self.load_button = QPushButton("Load State", self)
button_layout.addWidget(self.save_button)
button_layout.addWidget(self.load_button)
layout.addLayout(button_layout)
# Create the state manager
self.state_manager = WidgetStateManager(self)
# Connect buttons
self.save_button.clicked.connect(lambda: self.state_manager.save_state())
self.load_button.clicked.connect(lambda: self.state_manager.load_state())
if __name__ == "__main__": # pragma: no cover:
import sys
app = QApplication(sys.argv)
w = ExampleApp()
w.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1 @@

View File

@@ -5,7 +5,6 @@ import enum
from bec_lib.device import ComputedSignal, Device, Positioner, ReadoutPriority
from bec_lib.device import Signal as BECSignal
from bec_lib.logger import bec_logger
from pydantic import field_validator
from qtpy.QtCore import Property, Signal, Slot
from bec_widgets.utils import ConnectionConfig
@@ -26,35 +25,13 @@ class BECDeviceFilter(enum.Enum):
class DeviceInputConfig(ConnectionConfig):
device_filter: list[str] = []
readout_filter: list[str] = []
device_filter: list[BECDeviceFilter] = []
readout_filter: list[ReadoutPriority] = []
devices: list[str] = []
default: str | None = None
arg_name: str | None = None
apply_filter: bool = True
@field_validator("device_filter")
@classmethod
def check_device_filter(cls, v, values):
valid_device_filters = [entry.value for entry in BECDeviceFilter]
for filt in v:
if filt not in valid_device_filters:
raise ValueError(
f"Device filter {filt} is not a valid device filter {valid_device_filters}."
)
return v
@field_validator("readout_filter")
@classmethod
def check_readout_filter(cls, v, values):
valid_device_filters = [entry.value for entry in ReadoutPriority]
for filt in v:
if filt not in valid_device_filters:
raise ValueError(
f"Device filter {filt} is not a valid device filter {valid_device_filters}."
)
return v
class DeviceInputBase(BECWidget):
"""
@@ -82,7 +59,7 @@ class DeviceInputBase(BECWidget):
ReadoutPriority.ON_REQUEST: "readout_on_request",
}
def __init__(self, parent=None, client=None, config=None, gui_id: str | None = None, **kwargs):
def __init__(self, client=None, config=None, gui_id: str = None):
if config is None:
config = DeviceInputConfig(widget_class=self.__class__.__name__)
@@ -90,9 +67,7 @@ class DeviceInputBase(BECWidget):
if isinstance(config, dict):
config = DeviceInputConfig(**config)
self.config = config
super().__init__(
parent=parent, client=client, config=config, gui_id=gui_id, theme_update=True, **kwargs
)
super().__init__(client=client, config=config, gui_id=gui_id, theme_update=True)
self.get_bec_shortcuts()
self._device_filter = []
self._readout_filter = []

View File

@@ -29,21 +29,20 @@ class DeviceSignalInputBase(BECWidget):
signal object based on the current text of the widget.
"""
RPC = False
_filter_handler = {
Kind.hinted: "include_hinted_signals",
Kind.normal: "include_normal_signals",
Kind.config: "include_config_signals",
}
def __init__(self, client=None, config=None, gui_id: str = None, **kwargs):
def __init__(self, client=None, config=None, gui_id: str = None):
if config is None:
config = DeviceSignalInputBaseConfig(widget_class=self.__class__.__name__)
else:
if isinstance(config, dict):
config = DeviceSignalInputBaseConfig(**config)
self.config = config
super().__init__(client=client, config=config, gui_id=gui_id, **kwargs)
super().__init__(client=client, config=config, gui_id=gui_id)
self._device = None
self.get_bec_shortcuts()

View File

@@ -4,7 +4,7 @@
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.progress.bec_progressbar.bec_progressbar import BECProgressBar
from bec_widgets.widgets.bec_progressbar.bec_progressbar import BECProgressBar
DOM_XML = """
<ui language='c++'>

View File

@@ -14,19 +14,18 @@ class BECProgressBar(BECWidget, QWidget):
A custom progress bar with smooth transitions. The displayed text can be customized using a template.
"""
PLUGIN = True
USER_ACCESS = [
"set_value",
"set_maximum",
"set_minimum",
"label_template",
"label_template.setter",
"_get_label",
]
ICON_NAME = "page_control"
def __init__(self, parent=None, client=None, config=None, gui_id=None, **kwargs):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
def __init__(self, parent=None, client=None, config=None, gui_id=None):
super().__init__(client=client, config=config, gui_id=gui_id)
QWidget.__init__(self, parent=parent)
accent_colors = get_accent_colors()
@@ -236,10 +235,6 @@ class BECProgressBar(BECWidget, QWidget):
(value - self._user_minimum) / (self._user_maximum - self._user_minimum) * self._maximum
)
def _get_label(self) -> str:
"""Return the label text. mostly used for testing rpc."""
return self.center_label.text()
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)

View File

@@ -6,9 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.progress.bec_progressbar.bec_progress_bar_plugin import (
BECProgressBarPlugin,
)
from bec_widgets.widgets.bec_progressbar.bec_progress_bar_plugin import BECProgressBarPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(BECProgressBarPlugin())

View File

@@ -6,14 +6,14 @@ from qtpy.QtCore import Property, Qt, Signal, Slot
from qtpy.QtGui import QColor
from qtpy.QtWidgets import QHeaderView, QLabel, QTableWidget, QTableWidgetItem, QVBoxLayout, QWidget
from bec_widgets.qt_utils.compact_popup import CompactPopupWidget
from bec_widgets.qt_utils.toolbar import ModularToolBar, SeparatorAction, WidgetAction
from bec_widgets.utils.bec_connector import ConnectionConfig
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.compact_popup import CompactPopupWidget
from bec_widgets.utils.toolbar import ModularToolBar, SeparatorAction, WidgetAction
from bec_widgets.widgets.control.buttons.button_abort.button_abort import AbortButton
from bec_widgets.widgets.control.buttons.button_reset.button_reset import ResetButton
from bec_widgets.widgets.control.buttons.button_resume.button_resume import ResumeButton
from bec_widgets.widgets.control.buttons.stop_button.stop_button import StopButton
from bec_widgets.widgets.button_abort.button_abort import AbortButton
from bec_widgets.widgets.button_reset.button_reset import ResetButton
from bec_widgets.widgets.button_resume.button_resume import ResumeButton
from bec_widgets.widgets.stop_button.stop_button import StopButton
class BECQueue(BECWidget, CompactPopupWidget):
@@ -21,7 +21,6 @@ class BECQueue(BECWidget, CompactPopupWidget):
Widget to display the BEC queue.
"""
PLUGIN = True
ICON_NAME = "edit_note"
status_colors = {
"STOPPED": "red",
@@ -42,11 +41,9 @@ class BECQueue(BECWidget, CompactPopupWidget):
config: ConnectionConfig = None,
gui_id: str = None,
refresh_upon_start: bool = True,
**kwargs,
):
super().__init__(
parent=parent, layout=QVBoxLayout, client=client, gui_id=gui_id, config=config, **kwargs
)
super().__init__(client, config, gui_id)
CompactPopupWidget.__init__(self, parent=parent, layout=QVBoxLayout)
self.layout.setSpacing(0)
self.layout.setContentsMargins(0, 0, 0, 0)
@@ -57,7 +54,6 @@ class BECQueue(BECWidget, CompactPopupWidget):
# self.layout.addWidget(self.table)
self.table.setColumnCount(4)
self.table.setHorizontalHeaderLabels(["Scan Number", "Type", "Status", "Cancel"])
header = self.table.horizontalHeader()
header.setSectionResizeMode(QHeaderView.Stretch)
@@ -74,16 +70,15 @@ class BECQueue(BECWidget, CompactPopupWidget):
"""
Set the toolbar.
"""
widget_label = QLabel(text="Live Queue", parent=self)
widget_label = QLabel("Live Queue")
widget_label.setStyleSheet("font-weight: bold;")
self.toolbar = ModularToolBar(
parent=self,
actions={
"widget_label": WidgetAction(widget=widget_label),
"separator_1": SeparatorAction(),
"resume": WidgetAction(widget=ResumeButton(parent=self, toolbar=False)),
"stop": WidgetAction(widget=StopButton(parent=self, toolbar=False)),
"reset": WidgetAction(widget=ResetButton(parent=self, toolbar=False)),
"resume": WidgetAction(widget=ResumeButton(toolbar=False)),
"stop": WidgetAction(widget=StopButton(toolbar=False)),
"reset": WidgetAction(widget=ResetButton(toolbar=False)),
},
target_widget=self,
)
@@ -225,7 +220,7 @@ class BECQueue(BECWidget, CompactPopupWidget):
Returns:
AbortButton: The abort button.
"""
abort_button = AbortButton(parent=self, scan_id=scan_id)
abort_button = AbortButton(scan_id=scan_id)
abort_button.button.setText("")
abort_button.button.setIcon(
@@ -233,6 +228,7 @@ class BECQueue(BECWidget, CompactPopupWidget):
)
abort_button.button.setStyleSheet("background-color: rgba(0,0,0,0) ")
abort_button.button.setFlat(True)
return abort_button
def delete_selected_row(self):
@@ -240,7 +236,7 @@ class BECQueue(BECWidget, CompactPopupWidget):
button = self.sender()
row = self.table.indexAt(button.pos()).row()
self.table.removeRow(row)
button.close()
button.deleteLater()
def reset_content(self):

View File

@@ -6,7 +6,7 @@ from qtpy.QtDesigner import QDesignerCustomWidgetInterface
import bec_widgets
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.services.bec_queue.bec_queue import BECQueue
from bec_widgets.widgets.bec_queue.bec_queue import BECQueue
DOM_XML = """
<ui language='c++'>

View File

@@ -6,7 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.services.bec_queue.bec_queue_plugin import BECQueuePlugin
from bec_widgets.widgets.bec_queue.bec_queue_plugin import BECQueuePlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(BECQueuePlugin())

View File

@@ -4,19 +4,20 @@ The widget automatically updates the status of all running BEC services, and dis
from __future__ import annotations
import sys
from collections import defaultdict
from dataclasses import dataclass
from typing import TYPE_CHECKING
from bec_lib.utils.import_utils import lazy_import_from
from qtpy.QtCore import QObject, QTimer, Signal, Slot
from qtpy.QtWidgets import QHBoxLayout, QTreeWidget, QTreeWidgetItem
from qtpy.QtWidgets import QHBoxLayout, QTreeWidget, QTreeWidgetItem, QWidget
from bec_widgets.qt_utils.compact_popup import CompactPopupWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.compact_popup import CompactPopupWidget
from bec_widgets.widgets.services.bec_status_box.status_item import StatusItem
from bec_widgets.widgets.bec_status_box.status_item import StatusItem
if TYPE_CHECKING: # pragma: no cover
if TYPE_CHECKING:
from bec_lib.client import BECClient
# TODO : Put normal imports back when Pydantic gets faster
@@ -74,9 +75,7 @@ class BECStatusBox(BECWidget, CompactPopupWidget):
gui_id Optional(str): The unique id for the widget. Defaults to None.
"""
PLUGIN = True
CORE_SERVICES = ["DeviceServer", "ScanServer", "SciHub", "ScanBundler", "FileWriterManager"]
USER_ACCESS = ["get_server_state", "remove"]
service_update = Signal(BECServiceInfoContainer)
bec_core_state = Signal(str)
@@ -88,9 +87,9 @@ class BECStatusBox(BECWidget, CompactPopupWidget):
client: BECClient = None,
bec_service_status_mixin: BECServiceStatusMixin = None,
gui_id: str = None,
**kwargs,
):
super().__init__(parent=parent, layout=QHBoxLayout, client=client, gui_id=gui_id, **kwargs)
super().__init__(client=client, gui_id=gui_id)
CompactPopupWidget.__init__(self, parent=parent, layout=QHBoxLayout)
self.box_name = box_name
self.status_container = defaultdict(lambda: {"info": None, "item": None, "widget": None})
@@ -135,10 +134,6 @@ class BECStatusBox(BECWidget, CompactPopupWidget):
"QTreeWidget::item:selected {}"
)
def get_server_state(self) -> str:
"""Get the state ("RUNNING", "BUSY", "IDLE", "ERROR") of the BEC server"""
return self.status_container[self.box_name]["info"].status
def _create_status_widget(
self, service_name: str, status=BECStatus, info: dict = None, metrics: dict = None
) -> StatusItem:

View File

@@ -6,7 +6,7 @@ from qtpy.QtDesigner import QDesignerCustomWidgetInterface
import bec_widgets
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.services.bec_status_box.bec_status_box import BECStatusBox
from bec_widgets.widgets.bec_status_box.bec_status_box import BECStatusBox
DOM_XML = """
<ui language='c++'>

View File

@@ -6,7 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.services.bec_status_box.bec_status_box_plugin import BECStatusBoxPlugin
from bec_widgets.widgets.bec_status_box.bec_status_box_plugin import BECStatusBoxPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(BECStatusBoxPlugin())

View File

@@ -1,4 +1,4 @@
"""Module for a StatusItem widget to display status and metrics for a BEC service.
""" Module for a StatusItem widget to display status and metrics for a BEC service.
The widget is bound to be used with the BECStatusBox widget."""
import enum

View File

@@ -4,7 +4,7 @@
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.control.buttons.button_abort.button_abort import AbortButton
from bec_widgets.widgets.button_abort.button_abort import AbortButton
DOM_XML = """
<ui language='c++'>

View File

@@ -2,28 +2,21 @@ from bec_qthemes import material_icon
from qtpy.QtCore import Qt
from qtpy.QtWidgets import QHBoxLayout, QPushButton, QToolButton, QWidget
from bec_widgets.qt_utils.error_popups import SafeSlot
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
class AbortButton(BECWidget, QWidget):
"""A button that abort the scan."""
PLUGIN = True
ICON_NAME = "cancel"
RPC = True
def __init__(
self,
parent=None,
client=None,
config=None,
gui_id=None,
toolbar=False,
scan_id=None,
**kwargs,
self, parent=None, client=None, config=None, gui_id=None, toolbar=False, scan_id=None
):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
super().__init__(client=client, config=config, gui_id=gui_id)
QWidget.__init__(self, parent=parent)
self.get_bec_shortcuts()
self.layout = QHBoxLayout(self)

View File

@@ -6,9 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.control.buttons.button_abort.abort_button_plugin import (
AbortButtonPlugin,
)
from bec_widgets.widgets.button_abort.abort_button_plugin import AbortButtonPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(AbortButtonPlugin())

View File

@@ -2,19 +2,19 @@ from bec_qthemes import material_icon
from qtpy.QtCore import Qt
from qtpy.QtWidgets import QHBoxLayout, QMessageBox, QPushButton, QToolButton, QWidget
from bec_widgets.qt_utils.error_popups import SafeSlot
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
class ResetButton(BECWidget, QWidget):
"""A button that resets the scan queue."""
PLUGIN = True
ICON_NAME = "restart_alt"
RPC = True
def __init__(self, parent=None, client=None, config=None, gui_id=None, toolbar=False, **kwargs):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
def __init__(self, parent=None, client=None, config=None, gui_id=None, toolbar=False):
super().__init__(client=client, config=config, gui_id=gui_id)
QWidget.__init__(self, parent=parent)
self.get_bec_shortcuts()
self.layout = QHBoxLayout(self)

View File

@@ -6,9 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.control.buttons.button_reset.reset_button_plugin import (
ResetButtonPlugin,
)
from bec_widgets.widgets.button_reset.reset_button_plugin import ResetButtonPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(ResetButtonPlugin())

View File

@@ -4,7 +4,7 @@
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.control.buttons.button_reset.button_reset import ResetButton
from bec_widgets.widgets.button_reset.button_reset import ResetButton
DOM_XML = """
<ui language='c++'>

View File

@@ -2,19 +2,18 @@ from bec_qthemes import material_icon
from qtpy.QtCore import Qt
from qtpy.QtWidgets import QHBoxLayout, QPushButton, QToolButton, QWidget
from bec_widgets.qt_utils.error_popups import SafeSlot
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
class ResumeButton(BECWidget, QWidget):
"""A button that continue scan queue."""
PLUGIN = True
ICON_NAME = "resume"
RPC = True
def __init__(self, parent=None, client=None, config=None, gui_id=None, toolbar=False, **kwargs):
super().__init__(parent=parent, client=client, gui_id=gui_id, config=config, **kwargs)
def __init__(self, parent=None, client=None, config=None, gui_id=None, toolbar=False):
super().__init__(client=client, config=config, gui_id=gui_id)
QWidget.__init__(self, parent=parent)
self.get_bec_shortcuts()

View File

@@ -6,9 +6,7 @@ def main(): # pragma: no cover
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.control.buttons.button_resume.resume_button_plugin import (
ResumeButtonPlugin,
)
from bec_widgets.widgets.button_resume.resume_button_plugin import ResumeButtonPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(ResumeButtonPlugin())

Some files were not shown because too many files have changed in this diff Show More