1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-12 19:50:54 +02:00

Compare commits

..

1 Commits

32 changed files with 173 additions and 1030 deletions

View File

@@ -1,28 +0,0 @@
name: Check PR status for branch
on:
workflow_call:
outputs:
branch-pr:
description: The PR number if the branch is in one
value: ${{ jobs.pr.outputs.branch-pr }}
jobs:
pr:
runs-on: "ubuntu-latest"
outputs:
branch-pr: ${{ steps.script.outputs.result }}
steps:
- uses: actions/github-script@v7
id: script
if: github.event_name == 'push'
with:
script: |
const prs = await github.rest.pulls.list({
owner: context.repo.owner,
repo: context.repo.repo,
head: context.repo.owner + ':${{ github.ref_name }}'
})
if (prs.data.length) {
console.log(`::notice ::Skipping CI on branch push as it is already run in PR #${prs.data[0]["number"]}`)
return prs.data[0]["number"]
}

View File

@@ -1,36 +0,0 @@
name: Full CI
on: [push, pull_request]
permissions:
pull-requests: write
jobs:
check_pr_status:
uses: ./.github/workflows/check_pr.yml
formatter:
needs: check_pr_status
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/formatter.yml
unit-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/pytest.yml
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
unit-test-matrix:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/pytest-matrix.yml
generate-cli-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/generate-cli-check.yml
end2end-test:
needs: [check_pr_status, formatter]
if: needs.check_pr_status.outputs.branch-pr == ''
uses: ./.github/workflows/end2end-conda.yml

View File

@@ -1,48 +0,0 @@
name: Run Pytest with Coverage
on: [workflow_call]
jobs:
pytest:
runs-on: ubuntu-latest
defaults:
run:
shell: bash -el {0}
env:
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- uses: actions/checkout@v4
- name: Set up Conda
uses: conda-incubator/setup-miniconda@v3
with:
auto-update-conda: true
auto-activate-base: true
python-version: '3.11'
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
- name: Conda install and run pytest
run: |
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
git clone --branch $BEC_CORE_BRANCH https://github.com/bec-project/bec.git
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
git clone --branch $OPHYD_DEVICES_BRANCH https://github.com/bec-project/ophyd_devices.git
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
cd ./bec
conda create -q -n test-environment python=3.11
source ./bin/install_bec_dev.sh -t
cd ../
pip install -e ./ophyd_devices
pip install -e .[dev,pyside6]
pytest -v --files-path ./ --start-servers --random-order ./tests/end-2-end

View File

@@ -1,61 +0,0 @@
name: Formatter and Pylint jobs
on: [workflow_call]
jobs:
Formatter:
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.13'
- name: Run black and isort
run: |
pip install black isort
pip install -e .[dev]
black --check --diff --color .
isort --check --diff ./
Pylint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.13'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pylint pylint-exit anybadge
- name: Run Pylint
run: |
mkdir -p ./pylint
set +e
pylint ./${{ github.event.repository.name }} --output-format=text > ./pylint/pylint.log
pylint-exit $?
set -e
- name: Extract Pylint Score
id: score
run: |
SCORE=$(sed -n 's/^Your code has been rated at \([-0-9.]*\)\/.*/\1/p' ./pylint/pylint.log)
echo "score=$SCORE" >> $GITHUB_OUTPUT
- name: Create Badge
run: |
anybadge --label=Pylint --file=./pylint/pylint.svg --value="${{ steps.score.outputs.score }}" 2=red 4=orange 8=yellow 10=green
- name: Upload Artifacts
uses: actions/upload-artifact@v4
with:
name: pylint-artifacts
path: |
# ./pylint/pylint.log # not sure why this isn't working
./pylint/pylint.svg

View File

@@ -1,49 +0,0 @@
name: Run bw-generate-cli
on: [workflow_call]
jobs:
pytest:
runs-on: ubuntu-latest
defaults:
run:
shell: bash -el {0}
env:
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install os dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
- name: Clone and install dependencies
run: |
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
git clone --branch $BEC_CORE_BRANCH https://github.com/bec-project/bec.git
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
git clone --branch $OPHYD_DEVICES_BRANCH https://github.com/bec-project/ophyd_devices.git
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
pip install -e ./ophyd_devices
pip install -e ./bec/bec_lib[dev]
pip install -e ./bec/bec_ipython_client
pip install -e .[dev,pyside6]
- name: Run bw-generate-cli
run: |
bw-generate-cli --target bec_widgets
git diff --exit-code

View File

@@ -1,48 +0,0 @@
name: Run Pytest with different Python versions
on: [workflow_call]
jobs:
pytest-matrix:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
env:
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
- name: Clone and install dependencies
run: |
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
git clone --branch $BEC_CORE_BRANCH https://github.com/bec-project/bec.git
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
git clone --branch $OPHYD_DEVICES_BRANCH https://github.com/bec-project/ophyd_devices.git
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
pip install -e ./ophyd_devices
pip install -e ./bec/bec_lib[dev]
pip install -e ./bec/bec_ipython_client
pip install -e .[dev,pyside6]
- name: Run Pytest
run: |
pip install pytest pytest-random-order
pytest -v --maxfail=2 --junitxml=report.xml --random-order ./tests/unit_tests

View File

@@ -1,64 +0,0 @@
name: Run Pytest with Coverage
on:
workflow_call:
inputs:
pr_number:
description: 'Pull request number'
required: false
type: number
secrets:
CODECOV_TOKEN:
required: true
permissions:
pull-requests: write
jobs:
pytest:
runs-on: ubuntu-latest
env:
CHILD_PIPELINE_BRANCH: main # Set the branch you want for ophyd_devices
BEC_CORE_BRANCH: main # Set the branch you want for bec
OPHYD_DEVICES_BRANCH: main # Set the branch you want for ophyd_devices
PROJECT_PATH: ${{ github.repository }}
QTWEBENGINE_DISABLE_SANDBOX: 1
QT_QPA_PLATFORM: "offscreen"
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgl1 libegl1 x11-utils libxkbcommon-x11-0 libdbus-1-3 xvfb
sudo apt-get -y install libnss3 libxdamage1 libasound2t64 libatomic1 libxcursor1
- name: Clone and install dependencies
run: |
echo -e "\033[35;1m Using branch $BEC_CORE_BRANCH of BEC CORE \033[0;m";
git clone --branch $BEC_CORE_BRANCH https://github.com/bec-project/bec.git
echo -e "\033[35;1m Using branch $OPHYD_DEVICES_BRANCH of OPHYD_DEVICES \033[0;m";
git clone --branch $OPHYD_DEVICES_BRANCH https://github.com/bec-project/ophyd_devices.git
export OHPYD_DEVICES_PATH=$PWD/ophyd_devices
pip install -e ./ophyd_devices
pip install -e ./bec/bec_lib[dev]
pip install -e ./bec/bec_ipython_client
pip install -e .[dev,pyside6]
- name: Run Pytest with Coverage
id: coverage
run: pytest --random-order --cov --cov-config=pyproject.toml --cov-branch --cov-report=xml --no-cov-on-fail tests/unit_tests/
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: bec-project/bec_widgets

View File

@@ -1,35 +1,6 @@
# CHANGELOG
## v2.3.0 (2025-05-09)
### Features
- **bec_connector**: Ability to change object name during runtime
([`dc151cd`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/dc151cdfe39f1f0507eeee307a35c1677ae4d8c5))
## v2.2.0 (2025-05-09)
### Features
- **launcher**: Add support for launching plugin widget
([`1fb680a`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/1fb680abb40668e72007c245f32c80112466c46e))
### Refactoring
- **launch_window**: Widget tile added
([`b9e56c9`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/b9e56c96cbae561beb893cedb7d18e9b6a7bfc76))
## v2.1.3 (2025-05-07)
### Bug Fixes
- **bec-dispatcher**: Fix reference to boundmethods to avoid duplicated subscriptions
([`cf59d31`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/cf59d311132cd1a21f1893c19cc9f2a7e45101d0))
## v2.1.2 (2025-05-06)
### Bug Fixes

View File

@@ -1,6 +1,6 @@
BSD 3-Clause License
Copyright (c) 2025, Paul Scherrer Institute
Copyright (c) 2023, bec
All rights reserved.
Redistribution and use in source and binary forms, with or without

View File

@@ -1,16 +1,5 @@
# BEC Widgets
[![CI](https://github.com/bec-project/bec_widgets/actions/workflows/ci.yml/badge.svg)](https://github.com/bec-project/bec_widgets/actions/workflows/ci.yml)
[![badge](https://img.shields.io/pypi/v/bec-widgets)](https://pypi.org/project/bec-widgets/)
[![License](https://img.shields.io/github/license/bec-project/bec_widgets)](./LICENSE)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Python](https://img.shields.io/badge/python-3.10%20%7C%203.11%20%7C%203.12-blue?logo=python&logoColor=white)](https://www.python.org)
[![PySide6](https://img.shields.io/badge/PySide6-blue?logo=qt&logoColor=white)](https://doc.qt.io/qtforpython/)
[![Conventional Commits](https://img.shields.io/badge/conventional%20commits-1.0.0-yellow?logo=conventionalcommits&logoColor=white)](https://conventionalcommits.org)
[![codecov](https://codecov.io/gh/bec-project/bec_widgets/graph/badge.svg?token=0Z9IQRJKMY)](https://codecov.io/gh/bec-project/bec_widgets)
**⚠️ Important Notice:**
🚨 **PyQt6 is no longer supported** due to incompatibilities with Qt Designer. Please use **PySide6** instead. 🚨

View File

@@ -2,10 +2,10 @@ from __future__ import annotations
import os
import xml.etree.ElementTree as ET
from typing import TYPE_CHECKING, Callable
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from qtpy.QtCore import Qt, Signal # type: ignore
from qtpy.QtCore import Qt, Signal
from qtpy.QtGui import QPainter, QPainterPath, QPixmap
from qtpy.QtWidgets import (
QApplication,
@@ -21,10 +21,8 @@ from qtpy.QtWidgets import (
import bec_widgets
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils.bec_plugin_helper import get_all_plugin_widgets
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.name_utils import pascal_to_snake
from bec_widgets.utils.plugin_utils import get_plugin_auto_updates
from bec_widgets.utils.round_frame import RoundedFrame
from bec_widgets.utils.toolbar import ModularToolBar
@@ -37,8 +35,6 @@ from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import
if TYPE_CHECKING: # pragma: no cover
from qtpy.QtCore import QObject
from bec_widgets.utils.bec_widget import BECWidget
logger = bec_logger.logger
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
@@ -145,7 +141,6 @@ class LaunchWindow(BECMainWindow):
super().__init__(parent=parent, gui_id=gui_id, window_title=window_title, **kwargs)
self.app = QApplication.instance()
self.tiles: dict[str, LaunchTile] = {}
# Toolbar
self.dark_mode_button = DarkModeButton(parent=self, toolbar=True)
@@ -161,105 +156,58 @@ class LaunchWindow(BECMainWindow):
self.central_widget.layout = QHBoxLayout(self.central_widget)
self.setCentralWidget(self.central_widget)
self.register_tile(
name="dock_area",
self.tile_dock_area = LaunchTile(
icon_path=os.path.join(MODULE_PATH, "assets", "app_icons", "bec_widgets_icon.png"),
top_label="Get started",
main_label="BEC Dock Area",
description="Highly flexible and customizable dock area application with modular widgets.",
action_button=lambda: self.launch("dock_area"),
show_selector=False,
)
self.tile_dock_area.setFixedSize(*self.TILE_SIZE)
self.available_auto_updates: dict[str, type[AutoUpdates]] = (
self._update_available_auto_updates()
)
self.register_tile(
name="auto_update",
self.tile_auto_update = LaunchTile(
icon_path=os.path.join(MODULE_PATH, "assets", "app_icons", "auto_update.png"),
top_label="Get automated",
main_label="BEC Auto Update Dock Area",
description="Dock area with auto update functionality for BEC widgets plotting.",
action_button=self._open_auto_update,
show_selector=True,
selector_items=list(self.available_auto_updates.keys()) + ["Default"],
)
self.tile_auto_update.setFixedSize(*self.TILE_SIZE)
self.register_tile(
name="custom_ui_file",
self.tile_ui_file = LaunchTile(
icon_path=os.path.join(MODULE_PATH, "assets", "app_icons", "ui_loader_tile.png"),
top_label="Get customized",
main_label="Launch Custom UI File",
description="GUI application with custom UI file.",
action_button=self._open_custom_ui_file,
show_selector=False,
)
self.tile_ui_file.setFixedSize(*self.TILE_SIZE)
# plugin widgets
self.available_widgets: dict[str, BECWidget] = get_all_plugin_widgets()
if self.available_widgets:
plugin_repo_name = next(iter(self.available_widgets.values())).__module__.split(".")[0]
plugin_repo_name = plugin_repo_name.removesuffix("_bec").upper()
self.register_tile(
name="widget",
icon_path=os.path.join(
MODULE_PATH, "assets", "app_icons", "widget_launch_tile.png"
),
top_label="Get quickly started",
main_label=f"Launch a {plugin_repo_name} Widget",
description=f"GUI application with one widget from the {plugin_repo_name} repository.",
action_button=self._open_widget,
show_selector=True,
selector_items=list(self.available_widgets.keys()),
)
# Add tiles to the main layout
self.central_widget.layout.addWidget(self.tile_dock_area)
self.central_widget.layout.addWidget(self.tile_auto_update)
self.central_widget.layout.addWidget(self.tile_ui_file)
# hacky solution no time to waste
self.tiles = [self.tile_dock_area, self.tile_auto_update, self.tile_ui_file]
# Connect signals
self.tile_dock_area.action_button.clicked.connect(lambda: self.launch("dock_area"))
self.tile_auto_update.action_button.clicked.connect(self._open_auto_update)
self.tile_ui_file.action_button.clicked.connect(self._open_custom_ui_file)
self._update_theme()
# Auto updates
self.available_auto_updates: dict[str, type[AutoUpdates]] = (
self._update_available_auto_updates()
)
if self.tile_auto_update.selector is not None:
self.tile_auto_update.selector.addItems(
list(self.available_auto_updates.keys()) + ["Default"]
)
self.register = RPCRegister()
self.register.callbacks.append(self._turn_off_the_lights)
self.register.broadcast()
def register_tile(
self,
name: str,
icon_path: str | None = None,
top_label: str | None = None,
main_label: str | None = None,
description: str | None = None,
action_button: Callable | None = None,
show_selector: bool = False,
selector_items: list[str] | None = None,
):
"""
Register a tile in the launcher window.
Args:
name(str): The name of the tile.
icon_path(str): The path to the icon.
top_label(str): The top label of the tile.
main_label(str): The main label of the tile.
description(str): The description of the tile.
action_button(callable): The action to be performed when the button is clicked.
show_selector(bool): Whether to show a selector or not.
selector_items(list[str]): The items to be shown in the selector.
"""
tile = LaunchTile(
icon_path=icon_path,
top_label=top_label,
main_label=main_label,
description=description,
show_selector=show_selector,
)
tile.setFixedSize(*self.TILE_SIZE)
if action_button:
tile.action_button.clicked.connect(action_button)
if show_selector and selector_items:
tile.selector.addItems(selector_items)
self.central_widget.layout.addWidget(tile)
self.tiles[name] = tile
def launch(
self,
launch_script: str,
@@ -308,12 +256,6 @@ class LaunchWindow(BECMainWindow):
auto_update = kwargs.pop("auto_update", None)
return self._launch_auto_update(auto_update)
if launch_script == "widget":
widget = kwargs.pop("widget", None)
if widget is None:
raise ValueError("Widget name must be provided.")
return self._launch_widget(widget)
launch = getattr(bw_launch, launch_script, None)
if launch is None:
raise ValueError(f"Launch script {launch_script} not found.")
@@ -331,7 +273,6 @@ class LaunchWindow(BECMainWindow):
else:
window = BECMainWindow()
window.setCentralWidget(result_widget)
window.setWindowTitle(f"BEC - {result_widget.objectName()}")
window.show()
return result_widget
@@ -380,28 +321,11 @@ class LaunchWindow(BECMainWindow):
window.show()
return window
def _launch_widget(self, widget: type[BECWidget]) -> QWidget:
name = pascal_to_snake(widget.__name__)
WidgetContainerUtils.raise_for_invalid_name(name)
window = BECMainWindow()
widget_instance = widget(root_widget=True, object_name=name)
assert isinstance(widget_instance, QWidget)
QApplication.processEvents()
window.setCentralWidget(widget_instance)
window.resize(window.minimumSizeHint())
window.setWindowTitle(f"BEC - {widget_instance.objectName()}")
window.show()
return window
def apply_theme(self, theme: str):
"""
Change the theme of the application.
"""
for tile in self.tiles.values():
for tile in self.tiles:
tile.apply_theme(theme)
super().apply_theme(theme)
@@ -410,25 +334,14 @@ class LaunchWindow(BECMainWindow):
"""
Open the auto update window.
"""
if self.tiles["auto_update"].selector is None:
if self.tile_auto_update.selector is None:
auto_update = None
else:
auto_update = self.tiles["auto_update"].selector.currentText()
auto_update = self.tile_auto_update.selector.currentText()
if auto_update == "Default":
auto_update = None
return self.launch("auto_update", auto_update=auto_update)
def _open_widget(self):
"""
Open a widget from the available widgets.
"""
if self.tiles["widget"].selector is None:
return
widget = self.tiles["widget"].selector.currentText()
if widget not in self.available_widgets:
raise ValueError(f"Widget {widget} not found in available widgets.")
return self.launch("widget", widget=self.available_widgets[widget])
@SafeSlot(popup_error=True)
def _open_custom_ui_file(self):
"""

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.1 MiB

View File

@@ -205,17 +205,6 @@ class BECConnector:
f"This is not necessarily an error as the parent may be deleted before the child and includes already a cleanup. The following exception was raised:\n{content}"
)
def change_object_name(self, name: str) -> None:
"""
Change the object name of the widget. Unregister old name and register the new one.
Args:
name (str): The new object name.
"""
self.rpc_register.remove_rpc(self)
self.setObjectName(name.replace("-", "_").replace(" ", "_"))
QTimer.singleShot(0, self._update_object_name)
def _update_object_name(self) -> None:
"""
Enforce a unique object name among siblings and register the object for RPC.

View File

@@ -4,9 +4,8 @@ import collections
import random
import string
from collections.abc import Callable
from typing import TYPE_CHECKING, DefaultDict, Hashable, Union
from typing import TYPE_CHECKING, Union
import louie
import redis
from bec_lib.client import BECClient
from bec_lib.logger import bec_logger
@@ -42,25 +41,15 @@ class QtThreadSafeCallback(QObject):
self.cb_info = cb_info
self.cb = cb
self.cb_ref = louie.saferef.safe_ref(cb)
self.cb_signal.connect(self.cb)
self.topics = set()
def __hash__(self):
# make 2 differents QtThreadSafeCallback to look
# identical when used as dictionary keys, if the
# callback is the same
return f"{id(self.cb_ref)}{self.cb_info}".__hash__()
def __eq__(self, other):
if not isinstance(other, QtThreadSafeCallback):
return False
return self.cb_ref == other.cb_ref and self.cb_info == other.cb_info
return f"{id(self.cb)}{self.cb_info}".__hash__()
def __call__(self, msg_content, metadata):
if self.cb_ref() is None:
# callback has been deleted
return
self.cb_signal.emit(msg_content, metadata)
@@ -107,7 +96,7 @@ class BECDispatcher:
cls,
client=None,
config: str | ServiceConfig | None = None,
gui_id: str | None = None,
gui_id: str = None,
*args,
**kwargs,
):
@@ -120,9 +109,7 @@ class BECDispatcher:
if self._initialized:
return
self._registered_slots: DefaultDict[Hashable, QtThreadSafeCallback] = (
collections.defaultdict()
)
self._slots = collections.defaultdict(set)
self.client = client
if self.client is None:
@@ -175,13 +162,10 @@ class BECDispatcher:
topics (EndpointInfo | str | list): A topic or list of topics that can typically be acquired via bec_lib.MessageEndpoints
cb_info (dict | None): A dictionary containing information about the callback. Defaults to None.
"""
qt_slot = QtThreadSafeCallback(cb=slot, cb_info=cb_info)
if qt_slot not in self._registered_slots:
self._registered_slots[qt_slot] = qt_slot
qt_slot = self._registered_slots[qt_slot]
self.client.connector.register(topics, cb=qt_slot, **kwargs)
slot = QtThreadSafeCallback(cb=slot, cb_info=cb_info)
self.client.connector.register(topics, cb=slot, **kwargs)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
qt_slot.topics.update(set(topics_str))
self._slots[slot].update(set(topics_str))
def disconnect_slot(self, slot: Callable, topics: Union[str, list]):
"""
@@ -194,16 +178,16 @@ class BECDispatcher:
# find the right slot to disconnect from ;
# slot callbacks are wrapped in QtThreadSafeCallback objects,
# but the slot we receive here is the original callable
for connected_slot in self._registered_slots.values():
for connected_slot in self._slots:
if connected_slot.cb == slot:
break
else:
return
self.client.connector.unregister(topics, cb=connected_slot)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
self._registered_slots[connected_slot].topics.difference_update(set(topics_str))
if not self._registered_slots[connected_slot].topics:
del self._registered_slots[connected_slot]
self._slots[connected_slot].difference_update(set(topics_str))
if not self._slots[connected_slot]:
del self._slots[connected_slot]
def disconnect_topics(self, topics: Union[str, list]):
"""
@@ -214,16 +198,11 @@ class BECDispatcher:
"""
self.client.connector.unregister(topics)
topics_str, _ = self.client.connector._convert_endpointinfo(topics)
remove_slots = []
for connected_slot in self._registered_slots.values():
connected_slot.topics.difference_update(set(topics_str))
if not connected_slot.topics:
remove_slots.append(connected_slot)
for connected_slot in remove_slots:
self._registered_slots.pop(connected_slot, None)
for slot in list(self._slots.keys()):
slot_topics = self._slots[slot]
slot_topics.difference_update(set(topics_str))
if not slot_topics:
del self._slots[slot]
def disconnect_all(self, *args, **kwargs):
"""

View File

@@ -31,7 +31,6 @@ class SidePanel(QWidget):
panel_max_width: int = 200,
animation_duration: int = 200,
animations_enabled: bool = True,
show_toolbar: bool = True,
):
super().__init__(parent=parent)
@@ -41,7 +40,6 @@ class SidePanel(QWidget):
self._panel_max_width = panel_max_width
self._animation_duration = animation_duration
self._animations_enabled = animations_enabled
self._show_toolbar = show_toolbar
self._panel_width = 0
self._panel_height = 0
@@ -73,14 +71,13 @@ class SidePanel(QWidget):
self.stack_widget.setMinimumWidth(5)
self.stack_widget.setMaximumWidth(self._panel_max_width)
if self._orientation in ("left", "right"):
if self._show_toolbar:
self.main_layout.addWidget(self.toolbar)
if self._orientation == "left":
self.main_layout.addWidget(self.toolbar)
self.main_layout.addWidget(self.container)
else:
self.main_layout.addWidget(self.container)
self.main_layout.addWidget(self.toolbar)
if self._orientation == "left":
self.main_layout.addWidget(self.container)
else:
self.main_layout.insertWidget(0, self.container)
self.container.layout.addWidget(self.stack_widget)
self.menu_anim = QPropertyAnimation(self, b"panel_width")
@@ -105,13 +102,11 @@ class SidePanel(QWidget):
self.stack_widget.setMaximumHeight(self._panel_max_width)
if self._orientation == "top":
if self._show_toolbar:
self.main_layout.addWidget(self.toolbar)
self.main_layout.addWidget(self.toolbar)
self.main_layout.addWidget(self.container)
else:
self.main_layout.addWidget(self.container)
if self._show_toolbar:
self.main_layout.addWidget(self.toolbar)
self.main_layout.addWidget(self.toolbar)
self.container.layout.addWidget(self.stack_widget)
@@ -238,24 +233,21 @@ class SidePanel(QWidget):
def add_menu(
self,
action_id: str,
icon_name: str,
tooltip: str,
widget: QWidget,
action_id: str | None = None,
icon_name: str | None = None,
tooltip: str | None = None,
title: str | None = None,
) -> int:
):
"""
Add a menu to the side panel.
Args:
action_id(str): The ID of the action.
icon_name(str): The name of the icon.
tooltip(str): The tooltip for the action.
widget(QWidget): The widget to add to the panel.
action_id(str | None): The ID of the action. Optional if no toolbar action is needed.
icon_name(str | None): The name of the icon. Optional if no toolbar action is needed.
tooltip(str | None): The tooltip for the action. Optional if no toolbar action is needed.
title(str | None): The title of the panel.
Returns:
int: The index of the added panel, which can be used with show_panel() and switch_to().
title(str): The title of the panel.
"""
# container_widget: top-level container for the stacked page
container_widget = QWidget()
@@ -286,35 +278,32 @@ class SidePanel(QWidget):
index = self.stack_widget.count()
self.stack_widget.addWidget(container_widget)
# Add an action to the toolbar if action_id, icon_name, and tooltip are provided
if action_id is not None and icon_name is not None and tooltip is not None:
action = MaterialIconAction(icon_name=icon_name, tooltip=tooltip, checkable=True)
self.toolbar.add_action(action_id, action, target_widget=self)
# Add an action to the toolbar
action = MaterialIconAction(icon_name=icon_name, tooltip=tooltip, checkable=True)
self.toolbar.add_action(action_id, action, target_widget=self)
def on_action_toggled(checked: bool):
if self.switching_actions:
return
def on_action_toggled(checked: bool):
if self.switching_actions:
return
if checked:
if self.current_action and self.current_action != action.action:
self.switching_actions = True
self.current_action.setChecked(False)
self.switching_actions = False
if checked:
if self.current_action and self.current_action != action.action:
self.switching_actions = True
self.current_action.setChecked(False)
self.switching_actions = False
self.current_action = action.action
self.current_action = action.action
if not self.panel_visible:
self.show_panel(index)
else:
self.switch_to(index)
if not self.panel_visible:
self.show_panel(index)
else:
if self.current_action == action.action:
self.current_action = None
self.hide_panel()
self.switch_to(index)
else:
if self.current_action == action.action:
self.current_action = None
self.hide_panel()
action.action.toggled.connect(on_action_toggled)
return index
action.action.toggled.connect(on_action_toggled)
############################################
@@ -343,56 +332,41 @@ class ExampleApp(QMainWindow): # pragma: no cover
self.add_side_menus()
def add_side_menus(self):
# Example 1: With action, icon, and tooltip
widget1 = QWidget()
layout1 = QVBoxLayout(widget1)
for i in range(15):
layout1.addWidget(QLabel(f"Widget 1 label row {i}"))
self.side_panel.add_menu(
widget=widget1,
action_id="widget1",
icon_name="counter_1",
tooltip="Show Widget 1",
widget=widget1,
title="Widget 1 Panel",
)
# Example 2: With action, icon, and tooltip
widget2 = QWidget()
layout2 = QVBoxLayout(widget2)
layout2.addWidget(QLabel("Short widget 2 content"))
self.side_panel.add_menu(
widget=widget2,
action_id="widget2",
icon_name="counter_2",
tooltip="Show Widget 2",
widget=widget2,
title="Widget 2 Panel",
)
# Example 3: With action, icon, and tooltip
widget3 = QWidget()
layout3 = QVBoxLayout(widget3)
for i in range(10):
layout3.addWidget(QLabel(f"Line {i} for Widget 3"))
self.side_panel.add_menu(
widget=widget3,
action_id="widget3",
icon_name="counter_3",
tooltip="Show Widget 3",
widget=widget3,
title="Widget 3 Panel",
)
# Example 4: Without action, icon, and tooltip (can only be shown programmatically)
widget4 = QWidget()
layout4 = QVBoxLayout(widget4)
layout4.addWidget(QLabel("This panel has no toolbar button"))
layout4.addWidget(QLabel("It can only be shown programmatically"))
self.hidden_panel_index = self.side_panel.add_menu(widget=widget4, title="Hidden Panel")
# Example of how to show the hidden panel programmatically after 3 seconds
from qtpy.QtCore import QTimer
QTimer.singleShot(3000, lambda: self.side_panel.show_panel(self.hidden_panel_index))
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)

View File

@@ -53,7 +53,7 @@ class LayoutManagerWidget(QWidget):
self,
widget: QWidget | str,
row: int | None = None,
col: int | None = None,
col: Optional[int] = None,
rowspan: int = 1,
colspan: int = 1,
shift_existing: bool = True,
@@ -138,39 +138,6 @@ class LayoutManagerWidget(QWidget):
ref_row, ref_col, ref_rowspan, ref_colspan = self.widget_positions[reference_widget]
# Determine new widget position based on the specified relative position
# If adding to the left or right with shifting, shift the entire column
if (
position in ("left", "right")
and shift_existing
and shift_direction in ("left", "right")
):
column = ref_col
# Collect all rows in this column and sort for safe shifting
rows = sorted(
{row for (row, col) in self.position_widgets.keys() if col == column},
reverse=(shift_direction == "right"),
)
# Shift each widget in the column
for r in rows:
self.shift_widgets(direction=shift_direction, start_row=r, start_col=column)
# Update reference widget's position after the column shift
ref_row, ref_col, ref_rowspan, ref_colspan = self.widget_positions[reference_widget]
new_row = ref_row
# Compute insertion column based on relative position
if position == "left":
new_col = ref_col - ref_colspan
else:
new_col = ref_col + ref_colspan
# Add the new widget without triggering another shift
return self.add_widget(
widget=widget,
row=new_row,
col=new_col,
rowspan=rowspan,
colspan=colspan,
shift_existing=False,
)
if position == "left":
new_row = ref_row
new_col = ref_col - 1

View File

@@ -397,7 +397,7 @@ class DeviceInputBase(BECWidget):
object: Device object, can be device of type Device, Positioner, Signal or ComputedSignal.
"""
self.validate_device(device)
dev = getattr(self.dev, device, None)
dev = getattr(self.dev, device.lower(), None)
if dev is None:
raise ValueError(
f"Device {device} is not found in the device manager {self.dev} as enabled device."

View File

@@ -250,7 +250,7 @@ class DeviceSignalInputBase(BECWidget):
object: Device object, can be device of type Device, Positioner, Signal or ComputedSignal.
"""
self.validate_device(device)
dev = getattr(self.dev, device, None)
dev = getattr(self.dev, device.lower(), None)
if dev is None:
logger.warning(f"Device {device} not found in devicemanager.")
return None

View File

@@ -140,7 +140,7 @@ class DeviceComboBox(DeviceInputBase, QComboBox):
"""
if self.validate_device(input_text) is True:
self._is_valid_input = True
self.device_selected.emit(input_text)
self.device_selected.emit(input_text.lower())
else:
self._is_valid_input = False
self.update()

View File

@@ -147,7 +147,7 @@ class DeviceLineEdit(DeviceInputBase, QLineEdit):
"""
if self.validate_device(input_text) is True:
self._is_valid_input = True
self.device_selected.emit(input_text)
self.device_selected.emit(input_text.lower())
else:
self._is_valid_input = False
self.update()

View File

@@ -31,9 +31,6 @@ from bec_widgets.widgets.control.device_input.device_line_edit.device_line_edit
)
from bec_widgets.widgets.dap.dap_combo_box.dap_combo_box import DapComboBox
from bec_widgets.widgets.plots.waveform.curve import CurveConfig, DeviceSignal
from bec_widgets.widgets.utility.visual.color_button_native.color_button_native import (
ColorButtonNative,
)
from bec_widgets.widgets.utility.visual.colormap_widget.colormap_widget import BECColorMapWidget
if TYPE_CHECKING: # pragma: no cover
@@ -43,6 +40,49 @@ if TYPE_CHECKING: # pragma: no cover
logger = bec_logger.logger
class ColorButton(QPushButton):
"""A QPushButton subclass that displays a color.
The background is set to the given color and the button text is the hex code.
The text color is chosen automatically (black if the background is light, white if dark)
to guarantee good readability.
"""
def __init__(self, color="#000000", parent=None):
"""Initialize the color button.
Args:
color (str): The initial color in hex format (e.g., '#000000').
parent: Optional QWidget parent.
"""
super().__init__(parent)
self.set_color(color)
def set_color(self, color):
"""Set the button's color and update its appearance.
Args:
color (str or QColor): The new color to assign.
"""
if isinstance(color, QColor):
self._color = color.name()
else:
self._color = color
self._update_appearance()
def color(self):
"""Return the current color in hex."""
return self._color
def _update_appearance(self):
"""Update the button style based on the background color's brightness."""
c = QColor(self._color)
brightness = c.lightnessF()
text_color = "#000000" if brightness > 0.5 else "#FFFFFF"
self.setStyleSheet(f"background-color: {self._color}; color: {text_color};")
self.setText(self._color)
class CurveRow(QTreeWidgetItem):
DELETE_BUTTON_COLOR = "#CC181E"
"""A unified row that can represent either a device or a DAP curve.
@@ -153,7 +193,7 @@ class CurveRow(QTreeWidgetItem):
def _init_style_controls(self):
"""Create columns 3..6: color button, style combo, width spin, symbol spin."""
# Color in col 3
self.color_button = ColorButtonNative(color=self.config.color)
self.color_button = ColorButton(self.config.color)
self.color_button.clicked.connect(lambda: self._select_color(self.color_button))
self.tree.setItemWidget(self, 3, self.color_button)
@@ -244,11 +284,6 @@ class CurveRow(QTreeWidgetItem):
self.dap_combo.deleteLater()
self.dap_combo = None
if getattr(self, "color_button", None) is not None:
self.color_button.close()
self.color_button.deleteLater()
self.color_button = None
# Remove the item from the tree widget
index = self.tree.indexOfTopLevelItem(self)
if index != -1:
@@ -302,8 +337,8 @@ class CurveRow(QTreeWidgetItem):
self.config.label = f"{parent_conf.label}-{new_dap}"
# Common style fields
self.config.color = self.color_button.color
self.config.symbol_color = self.color_button.color
self.config.color = self.color_button.color()
self.config.symbol_color = self.color_button.color()
self.config.pen_style = self.style_combo.currentText()
self.config.pen_width = self.width_spin.value()
self.config.symbol_size = self.symbol_spin.value()

View File

@@ -137,6 +137,7 @@ class Waveform(PlotBase):
# Curve data
self._sync_curves = []
self._async_curves = []
self._async_connected_devices: set[str] = set()
self._slice_index = None
self._dap_curves = []
self._mode: Literal["none", "sync", "async", "mixed"] = "none"
@@ -544,6 +545,7 @@ class Waveform(PlotBase):
continue
config = CurveConfig(**cfg_dict)
self._add_curve(config=config)
self.update_with_scan_history(-1)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}")
@@ -1012,6 +1014,7 @@ class Waveform(PlotBase):
return
if current_scan_id != self.scan_id:
self._async_connected_devices.clear()
self.reset()
self.new_scan.emit()
self.new_scan_id.emit(current_scan_id)
@@ -1178,18 +1181,22 @@ class Waveform(PlotBase):
except KeyError:
logger.warning(f"Curve {name} not found in plot item.")
pass
self.bec_dispatcher.connect_slot(
self.on_async_readback,
MessageEndpoints.device_async_readback(self.scan_id, name),
from_start=True,
cb_info={"scan_id": self.scan_id},
)
logger.info(f"Setup async curve {name}")
# Connect only once per device signal
if name not in self._async_connected_devices:
self.bec_dispatcher.connect_slot(
self.on_async_readback,
MessageEndpoints.device_async_readback(self.scan_id, name),
from_start=True,
cb_info={"scan_id": self.scan_id},
)
self._async_connected_devices.add(name)
logger.info(f"Async read-back connected for {name}")
@SafeSlot(dict, dict, verify_sender=True)
def on_async_readback(self, msg, metadata):
"""
Get async data readback. This code needs to be fast, therefor we try
Get async data readback. This code needs to be fast; therefore, we try
to reduce the number of copies in between cycles. Be careful when refactoring
this part as it will affect the performance of the async readback.

View File

@@ -1,58 +0,0 @@
from qtpy.QtGui import QColor
from qtpy.QtWidgets import QPushButton
from bec_widgets import BECWidget, SafeProperty, SafeSlot
class ColorButtonNative(BECWidget, QPushButton):
"""A QPushButton subclass that displays a color.
The background is set to the given color and the button text is the hex code.
The text color is chosen automatically (black if the background is light, white if dark)
to guarantee good readability.
"""
RPC = False
PLUGIN = True
ICON_NAME = "colors"
def __init__(self, parent=None, color="#000000", **kwargs):
"""Initialize the color button.
Args:
parent: Optional QWidget parent.
color (str): The initial color in hex format (e.g., '#000000').
"""
super().__init__(parent=parent, **kwargs)
self.set_color(color)
@SafeSlot()
def set_color(self, color):
"""Set the button's color and update its appearance.
Args:
color (str or QColor): The new color to assign.
"""
if isinstance(color, QColor):
self._color = color.name()
else:
self._color = color
self._update_appearance()
@SafeProperty("QColor")
def color(self):
"""Return the current color in hex."""
return self._color
@color.setter
def color(self, value):
"""Set the button's color and update its appearance."""
self.set_color(value)
def _update_appearance(self):
"""Update the button style based on the background color's brightness."""
c = QColor(self._color)
brightness = c.lightnessF()
text_color = "#000000" if brightness > 0.5 else "#FFFFFF"
self.setStyleSheet(f"background-color: {self._color}; color: {text_color};")
self.setText(self._color)

View File

@@ -1 +0,0 @@
{'files': ['color_button_native.py']}

View File

@@ -1,56 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from bec_widgets.utils.bec_designer import designer_material_icon
from bec_widgets.widgets.utility.visual.color_button_native.color_button_native import (
ColorButtonNative,
)
DOM_XML = """
<ui language='c++'>
<widget class='ColorButtonNative' name='color_button_native'>
</widget>
</ui>
"""
class ColorButtonNativePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
t = ColorButtonNative(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return "BEC Buttons"
def icon(self):
return designer_material_icon(ColorButtonNative.ICON_NAME)
def includeFile(self):
return "color_button_native"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "ColorButtonNative"
def toolTip(self):
return "A QPushButton subclass that displays a color."
def whatsThis(self):
return self.toolTip()

View File

@@ -1,17 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from bec_widgets.widgets.utility.visual.color_button_native.color_button_native_plugin import (
ColorButtonNativePlugin,
)
QPyDesignerCustomWidgetCollection.addCustomWidget(ColorButtonNativePlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "2.3.0"
version = "2.1.2"
description = "BEC Widgets"
requires-python = ">=3.10"
classifiers = [
@@ -38,7 +38,6 @@ dev = [
"pytest-timeout~=2.2",
"pytest-xvfb~=3.0",
"pytest~=8.0",
"pytest-cov~=6.1.1",
]
[project.urls]
@@ -106,14 +105,3 @@ env = "GL_TOKEN"
[tool.semantic_release.publish]
dist_glob_patterns = ["dist/*"]
upload_to_vcs_release = true
[tool.coverage.report]
skip_empty = true # exclude empty *files*, e.g. __init__.py, from the report
exclude_also = [ # Exclude lines matching these regexes from the coverage report
"pragma: no cover",
"if TYPE_CHECKING:",
"return NotImplemented",
"raise NotImplementedError",
"\\.\\.\\.",
'if __name__ == "__main__":',
]

View File

@@ -82,52 +82,3 @@ def test_bec_connector_submit_task(bec_connector):
while not completed:
QApplication.processEvents()
time.sleep(0.1)
def test_bec_connector_change_object_name(bec_connector):
# Store the original object name and RPC register state
original_name = bec_connector.objectName()
original_gui_id = bec_connector.gui_id
# Call the method with a new name
new_name = "new_test_name"
bec_connector.change_object_name(new_name)
# Process events to allow the single shot timer to execute
QApplication.processEvents()
# Verify that the object name was changed correctly
assert bec_connector.objectName() == new_name
assert bec_connector.object_name == new_name
# Verify that the object is registered in the RPC register with the new name
assert bec_connector.rpc_register.object_is_registered(bec_connector)
# Verify that the object with the original name is no longer registered
# The object should still have the same gui_id
assert bec_connector.gui_id == original_gui_id
# Check that no object with the original name exists in the RPC register
all_objects = bec_connector.rpc_register.list_all_connections().values()
assert not any(obj.objectName() == original_name for obj in all_objects)
# Store the current name for the next test
previous_name = bec_connector.objectName()
# Test with spaces and hyphens
name_with_spaces_and_hyphens = "test name-with-hyphens"
expected_name = "test_name_with_hyphens"
bec_connector.change_object_name(name_with_spaces_and_hyphens)
# Process events to allow the single shot timer to execute
QApplication.processEvents()
# Verify that the object name was changed correctly with replacements
assert bec_connector.objectName() == expected_name
assert bec_connector.object_name == expected_name
# Verify that the object is still registered in the RPC register after the second name change
assert bec_connector.rpc_register.object_is_registered(bec_connector)
# Verify that the object with the previous name is no longer registered
all_objects = bec_connector.rpc_register.list_all_connections().values()
assert not any(obj.objectName() == previous_name for obj in all_objects)

View File

@@ -7,7 +7,7 @@ import pytest
from bec_lib.messages import ScanMessage
from bec_lib.serialization import MsgpackSerialization
from bec_widgets.utils.bec_dispatcher import QtRedisConnector, QtThreadSafeCallback
from bec_widgets.utils.bec_dispatcher import QtRedisConnector
@pytest.fixture
@@ -27,7 +27,6 @@ def bec_dispatcher_w_connector(bec_dispatcher, topics_msg_list, send_msg_event):
connector = QtRedisConnector("localhost:1", redis_class_mock)
bec_dispatcher.client.connector = connector
yield bec_dispatcher
connector.shutdown()
dummy_msg = MsgpackSerialization.dumps(ScanMessage(point_id=0, scan_id="0", data={}))
@@ -63,6 +62,7 @@ def test_dispatcher_disconnect_all(bec_dispatcher_w_connector, qtbot, send_msg_e
@pytest.mark.parametrize("topics_msg_list", [(("topic1", dummy_msg), ("topic2", dummy_msg))])
def test_dispatcher_disconnect_one(bec_dispatcher_w_connector, qtbot, send_msg_event):
# test for BEC issue #276
bec_dispatcher = bec_dispatcher_w_connector
cb1 = mock.Mock(spec=[])
cb2 = mock.Mock(spec=[])
@@ -86,21 +86,12 @@ def test_dispatcher_2_cb_same_topic(bec_dispatcher_w_connector, qtbot, send_msg_
cb1 = mock.Mock(spec=[])
cb2 = mock.Mock(spec=[])
num_slots = len(bec_dispatcher._registered_slots)
bec_dispatcher.connect_slot(cb1, "topic1")
bec_dispatcher.connect_slot(cb2, "topic1")
# The redis connector should only subscribe once to the topic
assert len(bec_dispatcher.client.connector._topics_cb) == 1
# The the given topic, two callbacks should be registered
assert len(bec_dispatcher.client.connector._topics_cb["topic1"]) == 2
# The dispatcher should have two slots
assert len(bec_dispatcher._registered_slots) == num_slots + 2
assert len(bec_dispatcher._slots) == 2
bec_dispatcher.disconnect_slot(cb1, "topic1")
assert len(bec_dispatcher._registered_slots) == num_slots + 1
assert len(bec_dispatcher._slots) == 1
send_msg_event.set()
qtbot.wait(10)
@@ -108,31 +99,9 @@ def test_dispatcher_2_cb_same_topic(bec_dispatcher_w_connector, qtbot, send_msg_
cb2.assert_called_once()
@pytest.mark.parametrize("topics_msg_list", [(("topic1", dummy_msg),)])
def test_dispatcher_2_cb_same_topic_same_slot(bec_dispatcher_w_connector, qtbot, send_msg_event):
bec_dispatcher = bec_dispatcher_w_connector
cb1 = mock.Mock(spec=[])
bec_dispatcher.connect_slot(cb1, "topic1")
bec_dispatcher.connect_slot(cb1, "topic1")
assert len(bec_dispatcher.client.connector._topics_cb) == 1
assert (
len(list(filter(lambda slot: slot.cb == cb1, bec_dispatcher._registered_slots.values())))
== 1
)
send_msg_event.set()
qtbot.wait(10)
assert cb1.call_count == 1
bec_dispatcher.disconnect_slot(cb1, "topic1")
assert (
len(list(filter(lambda slot: slot.cb == cb1, bec_dispatcher._registered_slots.values())))
== 0
)
@pytest.mark.parametrize("topics_msg_list", [(("topic1", dummy_msg), ("topic2", dummy_msg))])
def test_dispatcher_2_topic_same_cb(bec_dispatcher_w_connector, qtbot, send_msg_event):
# test for BEC issue #276
bec_dispatcher = bec_dispatcher_w_connector
cb1 = mock.Mock(spec=[])
@@ -145,36 +114,3 @@ def test_dispatcher_2_topic_same_cb(bec_dispatcher_w_connector, qtbot, send_msg_
send_msg_event.set()
qtbot.wait(10)
cb1.assert_called_once()
@pytest.mark.parametrize("topics_msg_list", [(("topic1", dummy_msg), ("topic2", dummy_msg))])
def test_dispatcher_2_topic_same_cb_with_boundmethod(
bec_dispatcher_w_connector, qtbot, send_msg_event
):
bec_dispatcher = bec_dispatcher_w_connector
class MockObject:
def mock_slot(self, msg, metadata):
pass
cb1 = MockObject()
bec_dispatcher.connect_slot(cb1.mock_slot, "topic1", {"metadata": "test"})
bec_dispatcher.connect_slot(cb1.mock_slot, "topic1", {"metadata": "test"})
def _get_slots():
return list(
filter(
lambda slot: slot == QtThreadSafeCallback(cb1.mock_slot, {"metadata": "test"}),
bec_dispatcher._registered_slots.values(),
)
)
assert len(bec_dispatcher.client.connector._topics_cb) == 1
assert len(_get_slots()) == 1
bec_dispatcher.disconnect_slot(cb1.mock_slot, "topic1")
assert len(bec_dispatcher.client.connector._topics_cb) == 0
assert len(_get_slots()) == 0
send_msg_event.set()
qtbot.wait(10)

View File

@@ -64,7 +64,7 @@ def test_launch_window_launch_ui_file_raises_for_qmainwindow(bec_launch_window):
def test_launch_window_launch_default_auto_update(bec_launch_window):
# Mock the auto update selection
bec_launch_window.tiles["auto_update"].selector.setCurrentText("Default")
bec_launch_window.tile_auto_update.selector.setCurrentText("Default")
# Call the method to launch the auto update
res = bec_launch_window._open_auto_update()
@@ -82,11 +82,11 @@ def test_launch_window_launch_plugin_auto_update(bec_launch_window):
class PluginAutoUpdate(AutoUpdates): ...
bec_launch_window.available_auto_updates = {"PluginAutoUpdate": PluginAutoUpdate}
bec_launch_window.tiles["auto_update"].selector.clear()
bec_launch_window.tiles["auto_update"].selector.addItems(
bec_launch_window.tile_auto_update.selector.clear()
bec_launch_window.tile_auto_update.selector.addItems(
list(bec_launch_window.available_auto_updates.keys()) + ["Default"]
)
bec_launch_window.tiles["auto_update"].selector.setCurrentText("PluginAutoUpdate")
bec_launch_window.tile_auto_update.selector.setCurrentText("PluginAutoUpdate")
res = bec_launch_window._open_auto_update()
assert isinstance(res, PluginAutoUpdate)
assert res.windowTitle() == "BEC - PluginAutoUpdate"

View File

@@ -1,5 +1,4 @@
from __future__ import annotations
from typing import Optional
from unittest.mock import patch
import pytest
@@ -9,8 +8,7 @@ from bec_widgets.widgets.containers.layout_manager.layout_manager import LayoutM
class MockWidgetHandler:
def create_widget(self, widget_type: str) -> QWidget | None:
def create_widget(self, widget_type: str) -> Optional[QWidget]:
if widget_type == "ButtonWidget":
return QPushButton()
elif widget_type == "LabelWidget":
@@ -227,11 +225,13 @@ def test_add_widget_overlap_with_span(layout_manager):
@pytest.mark.parametrize(
"position, expected_position",
[("left", "left"), ("right", "right"), ("top", "top"), ("bottom", "bottom")],
"position, btn3_coords",
[("left", (1, 0)), ("right", (1, 2)), ("top", (0, 1)), ("bottom", (2, 1))],
)
def test_add_widget_relative(layout_manager, position, expected_position):
def test_add_widget_relative(layout_manager, position, btn3_coords):
"""Test adding a widget relative to an existing widget using parameterized data."""
expected_row, expected_col = btn3_coords
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
btn3 = QPushButton("Button 3")
@@ -241,28 +241,9 @@ def test_add_widget_relative(layout_manager, position, expected_position):
layout_manager.add_widget_relative(btn3, reference_widget=btn2, position=position)
# Get the actual positions of the widgets
btn1_pos = layout_manager.widget_positions[btn1]
btn2_pos = layout_manager.widget_positions[btn2]
btn3_pos = layout_manager.widget_positions[btn3]
# Check that btn1 and btn2 are still in the layout
assert btn1 in layout_manager.widget_positions
assert btn2 in layout_manager.widget_positions
# Check that btn3 is positioned correctly relative to btn2
if expected_position == "left":
assert btn3_pos[1] < btn2_pos[1] # btn3's column < btn2's column
assert btn3_pos[0] == btn2_pos[0] # same row
elif expected_position == "right":
assert btn3_pos[1] > btn2_pos[1] # btn3's column > btn2's column
assert btn3_pos[0] == btn2_pos[0] # same row
elif expected_position == "top":
assert btn3_pos[0] < btn2_pos[0] # btn3's row < btn2's row
assert btn3_pos[1] == btn2_pos[1] # same column
elif expected_position == "bottom":
assert btn3_pos[0] > btn2_pos[0] # btn3's row > btn2's row
assert btn3_pos[1] == btn2_pos[1] # same column
assert layout_manager.get_widget(0, 0) == btn1
assert layout_manager.get_widget(1, 1) == btn2
assert layout_manager.get_widget(expected_row, expected_col) == btn3
def test_add_widget_relative_invalid_position(layout_manager):
@@ -385,74 +366,3 @@ def test_shift_all_widgets_up_at_top_row(layout_manager):
layout_manager.shift_all_widgets(direction="up")
assert "Shifting widgets out of grid boundaries." in str(exc_info.value)
@pytest.mark.parametrize(
"test_id, position, shift_direction, additional_assertions",
[
(
"from_left",
"left",
"right",
[
# Additional assertions for the left test case
lambda btn1_pos, btn1_new_pos, btn2_pos, btn2_new_pos, btn3_pos: btn1_new_pos[1]
> btn1_pos[1], # column shifted right
lambda btn1_pos, btn1_new_pos, btn2_pos, btn2_new_pos, btn3_pos: btn2_new_pos[1]
> btn2_pos[1], # column shifted right
lambda btn1_pos, btn1_new_pos, btn2_pos, btn2_new_pos, btn3_pos: btn3_pos[1]
< btn2_new_pos[1], # btn3 is to the left of btn2
],
),
(
"from_right",
"right",
"right",
[
# Additional assertions for the right test case
lambda btn1_pos, btn1_new_pos, btn2_pos, btn2_new_pos, btn3_pos: btn3_pos[1]
> btn2_new_pos[1] # btn3 is to the right of btn2
],
),
],
)
def test_column_shift_when_adding_widget(
layout_manager, test_id, position, shift_direction, additional_assertions
):
"""Test that adding a widget to a column of widgets shifts the entire column appropriately."""
# Create a column of widgets
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
# Add btn1 at position (0, 1)
layout_manager.add_widget(btn1, row=0, col=1)
# Add btn2 below btn1
layout_manager.add_widget_relative(btn2, reference_widget=btn1, position="bottom")
# Get the positions after initial setup
btn1_pos = layout_manager.widget_positions[btn1]
btn2_pos = layout_manager.widget_positions[btn2]
# Verify btn2 is below btn1 (same column)
assert btn1_pos[0] < btn2_pos[0] # btn2's row > btn1's row
assert btn1_pos[1] == btn2_pos[1] # same column
# Add a new button relative to btn2 with the specified position and shift_direction
btn3 = QPushButton("Button 3")
layout_manager.add_widget_relative(
btn3, reference_widget=btn2, position=position, shift_direction=shift_direction
)
# Get the updated positions
btn1_new_pos = layout_manager.widget_positions[btn1]
btn2_new_pos = layout_manager.widget_positions[btn2]
btn3_pos = layout_manager.widget_positions[btn3]
# Common assertions for both test cases
assert btn1_new_pos[1] == btn2_new_pos[1] # btn1 and btn2 still in same column
assert btn3_pos[0] == btn2_new_pos[0] # btn3 is in the same row as btn2
# Run additional assertions specific to each test case
for assertion in additional_assertions:
assertion(btn1_pos, btn1_new_pos, btn2_pos, btn2_new_pos, btn3_pos)