1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-10 02:30:54 +02:00

Compare commits

...

33 Commits

Author SHA1 Message Date
f343a692ff WIP hardcoded device name + image disabled validation 2025-01-08 11:31:51 +01:00
e1576f41b0 WIP Tomcat demo UI connection with slider and status 2025-01-08 11:18:44 +01:00
98f86739e3 WIP Tomcat demo UI created 2025-01-08 10:28:46 +01:00
54e64c9f10 feat(widget_io): general change signal for supported widgets 2025-01-06 10:28:16 +01:00
1c8b06cbe6 refactor(rpc,client_utils): minor cleanup and type hint improvements 2024-12-23 15:59:10 +01:00
52c5286d64 fix: do not display error popup if command is executed via RPC 2024-12-23 15:59:10 +01:00
c405421db9 fix: use generator exec feature of BEC Connector to remove the AutoUpdate thread+queue 2024-12-23 15:59:10 +01:00
0ff0c06bd1 feat: add test for BECGuiClient features .new, .delete, .show, .hide, .close 2024-12-23 15:59:10 +01:00
955cc64257 fix: tests: rename fixtures and add 'connected_client_gui_obj' 2024-12-23 15:59:10 +01:00
09cb08a233 fix: prevent top-level dock areas to be destroyed with [X] button 2024-12-23 15:59:10 +01:00
5c83702382 refactor: move RPC-related classes and modules to 'rpc' directory
This allows to break circular import, too
2024-12-23 15:59:10 +01:00
1b0382524f fix: simplify AutoUpdate code thanks to threadpool executor in BEC Connector 2024-12-23 15:59:10 +01:00
92b802021f feat: add '.delete()' method to BECDockArea, make main window undeletable 2024-12-23 15:59:10 +01:00
48c140f937 fix: add .windows property to keep track of top level windows, ensure all windows are shown/hidden 2024-12-23 15:59:10 +01:00
42fd78df40 fix: remove useless class member 2024-12-23 15:59:10 +01:00
271a4a24e7 fix: determine default figure since the beginning 2024-12-23 15:59:10 +01:00
1b03ded906 fix: prevent infinite recursion in show/hide methods 2024-12-23 15:59:10 +01:00
bde5618699 feat: add "new()" command to create new dock area windows from client 2024-12-23 15:59:10 +01:00
6f2eb6b4cd fix: bec-gui-server script: fix logic with __name__ == '__main__'
When started with "bec-gui-server" entry point, __name__ is
"bec_widgets.cli.server".
When started with "python -m bec_widgets.cli.server", __name__ is
"__main__".
So, better to not rely on __name__ at all.
2024-12-23 15:59:10 +01:00
2742a3c6cf fix: set minimum size hint on BECDockArea 2024-12-23 15:59:10 +01:00
809e654087 refactor: BECGuiClientMixin -> BECGuiClient
- Mixin class was only used with BECDockArea, now it is a class by itself
which represents the client object connected to the GUI server ; ".main"
is the dock area of the main window
- Enhanced "wait_for_server"
- ".selected_device" is stored in Redis, to allow server-side to know
about the auto update configuration instead of keeping it on client
2024-12-23 15:59:10 +01:00
bdb25206d9 fix: use specified timeout in _run_rpc 2024-12-23 15:59:10 +01:00
bd5414288c build: fixed pytest bec dependency 2024-12-20 18:13:00 +01:00
95f6a7ceb7 ci: install pytest plugin from specified repo, not pypi 2024-12-20 17:37:52 +01:00
semantic-release
b75c4c88fe 1.12.0
Automatically generated by python-semantic-release
2024-12-12 10:35:17 +00:00
e38048964f feat(safe_property): added decorator to handle errors in Property decorator from qt to not crash designer 2024-12-11 22:37:03 +01:00
semantic-release
ce11d1382c 1.11.0
Automatically generated by python-semantic-release
2024-12-11 16:19:34 +00:00
ff654b56ae test(collapsible_panel_manager): fixture changed to not use .show() 2024-12-11 15:24:59 +01:00
a434d3ee57 feat(collapsible_panel_manager): panel manager to handle collapsing and expanding widgets from the main widget added 2024-12-11 15:18:43 +01:00
semantic-release
b467b29f77 1.10.0
Automatically generated by python-semantic-release
2024-12-10 19:59:55 +00:00
17a63e3b63 feat(layout_manager): grid layout manager widget 2024-12-10 20:49:19 +01:00
semantic-release
66fc5306d6 1.9.1
Automatically generated by python-semantic-release
2024-12-10 19:34:00 +00:00
6563abfddc fix(designer): general way to find python lib on linux 2024-12-10 19:12:21 +01:00
39 changed files with 3264 additions and 426 deletions

View File

@@ -61,6 +61,7 @@ stages:
- pip install -e ./ophyd_devices
- pip install -e ./bec/bec_lib[dev]
- pip install -e ./bec/bec_ipython_client
- pip install -e ./bec/pytest_bec_e2e
.install-os-packages: &install-os-packages
- apt-get update

View File

@@ -1,6 +1,45 @@
# CHANGELOG
## v1.12.0 (2024-12-12)
### Features
- **safe_property**: Added decorator to handle errors in Property decorator from qt to not crash
designer
([`e380489`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/e38048964f942f9f4edba225835ad0a937503dd4))
## v1.11.0 (2024-12-11)
### Features
- **collapsible_panel_manager**: Panel manager to handle collapsing and expanding widgets from the
main widget added
([`a434d3e`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/a434d3ee574081356c32c096d2fd61f641e04542))
### Testing
- **collapsible_panel_manager**: Fixture changed to not use .show()
([`ff654b5`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/ff654b56ae98388a2b707c040d51220be6cbce13))
## v1.10.0 (2024-12-10)
### Features
- **layout_manager**: Grid layout manager widget
([`17a63e3`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/17a63e3b639ecf6b41c379717d81339b04ef10f8))
## v1.9.1 (2024-12-10)
### Bug Fixes
- **designer**: General way to find python lib on linux
([`6563abf`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/6563abfddc9fc9baba6769022d6925545decdba9))
## v1.9.0 (2024-12-10)
### Features
@@ -173,40 +212,3 @@ Depending on the test, auto-updates are enabled or not.
- **positioner_box**: Adjusted default signals
([`8e5c0ad`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/8e5c0ad8c8eff5a9308169bc663d2b7230f0ebb1))
## v1.4.0 (2024-11-11)
### Bug Fixes
- **crosshair**: Label of coordinates of TextItem displays numbers in general format
([`11e5937`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/11e5937ae0f3c1413acd4e66878a692ebe4ef7d0))
- **crosshair**: Label of coordinates of TextItem is updated according to the current theme of qapp
([`4f31ea6`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/4f31ea655cf6190e141e6a2720a2d6da517a2b5b))
- **crosshair**: Log is separately scaled for backend logic and for signal emit
([`b2eb71a`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/b2eb71aae0b6a7c82158f2d150ae1e31411cfdeb))
### Features
- **crosshair**: Textitem to display crosshair coordinates
([`035136d`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/035136d5171ec5f4311d15a9aa5bad2bdbc1f6cb))
### Testing
- **crosshair**: Tests extended
([`64df805`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/64df805a9ed92bb97e580ac3bc0a1bbd2b1cb81e))
## v1.3.3 (2024-11-07)
### Bug Fixes
- **scan_control**: Devicelineedit kwargs readings changed to get name of the positioner
([`5fabd4b`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/5fabd4bea95bafd2352102686357cc1db80813fd))
### Documentation
- Update outdated text in docs
([`4f0693c`](https://gitlab.psi.ch/bec/bec_widgets/-/commit/4f0693cae34b391d75884837e1ae6353a0501868))

View File

@@ -0,0 +1,100 @@
import os
import numpy as np
import pyqtgraph as pg
import requests
import sys
from PySide6.QtCore import Signal, Slot
from qtpy.QtCore import QSize
from qtpy.QtGui import QActionGroup, QIcon
from qtpy.QtWidgets import QApplication, QMainWindow, QStyle
import bec_widgets
from bec_lib.client import BECClient
from bec_lib.service_config import ServiceConfig
from bec_widgets.examples.general_app.web_links import BECWebLinksMixin
from bec_widgets.qt_utils.error_popups import SafeSlot
from bec_widgets.utils.bec_dispatcher import QtRedisConnector
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.utils.ui_loader import UILoader
MODULE_PATH = os.path.dirname(bec_widgets.__file__)
class TomcatApp(QMainWindow, BECWidget):
select_slice = Signal()
def __init__(self, parent=None, client=None, gui_id=None):
super(TomcatApp, self).__init__(parent)
BECWidget.__init__(self, client=client, gui_id=gui_id)
ui_file_path = os.path.join(os.path.dirname(__file__), "tomcat_app.ui")
self.load_ui(ui_file_path)
self.resize(1280, 720)
self.get_bec_shortcuts()
self.bec_dispatcher.connect_slot(self.test_connection, "GPU Fastapi message")
self.bec_dispatcher.connect_slot(self.status_update, "GPU Fastapi message")
self.ui.slider_select.valueChanged.connect(self.select_slice_from_slider)
self.proxy_slider = pg.SignalProxy(self.select_slice, rateLimit=2, slot=self.send_slice)
self.image_widget = self.ui.image_widget
def load_ui(self, ui_file):
loader = UILoader(self)
self.ui = loader.loader(ui_file)
self.setCentralWidget(self.ui)
def status_update(self, msg):
status = msg["data"]["GPU SVC Status"]
if status == "Running":
self.ui.radio_io.setChecked(True)
else:
self.ui.radio_io.setChecked(False)
# @SafeSlot(dict, dict)
def test_connection(self, msg):
print("Test Connection")
print(msg)
# print(metadata)
def select_slice_from_slider(self, value):
print(value)
self.select_slice.emit()
@Slot()
def send_slice(self):
value = self.ui.slider_select.value()
requests.post(
"http://ra-gpu-006:8000/api/v1/reco/single_slice",
json={"slice": value, "rot_center": 0},
)
print(f"Sending slice {value}")
def main(): # pragma: no cover
app = QApplication(sys.argv)
icon = QIcon()
icon.addFile(
os.path.join(MODULE_PATH, "assets", "app_icons", "BEC-General-App.png"), size=QSize(48, 48)
)
app.setWindowIcon(icon)
config = ServiceConfig(redis={"host": "ra-gpu-006", "port": 6379})
client = BECClient(config=config, connector_cls=QtRedisConnector)
main_window = TomcatApp(client=client)
main_window.show()
main_window.image_widget.image("RecoPreview")
# custom_data = np.random.rand(100, 100)
# main_window.image_widget._image.add_custom_image("custom", custom_data)
sys.exit(app.exec_())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -0,0 +1,278 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>MainWindow</class>
<widget class="QMainWindow" name="MainWindow">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>800</width>
<height>631</height>
</rect>
</property>
<property name="windowTitle">
<string>MainWindow</string>
</property>
<widget class="QWidget" name="centralwidget">
<layout class="QVBoxLayout" name="verticalLayout_2">
<item>
<widget class="BECImageWidget" name="image_widget"/>
</item>
</layout>
</widget>
<widget class="QMenuBar" name="menubar">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>800</width>
<height>29</height>
</rect>
</property>
</widget>
<widget class="QStatusBar" name="statusbar"/>
<widget class="QDockWidget" name="status_dock">
<property name="windowTitle">
<string>Status</string>
</property>
<attribute name="dockWidgetArea">
<number>1</number>
</attribute>
<widget class="QWidget" name="dockWidgetContents">
<layout class="QGridLayout" name="gridLayout">
<item row="0" column="0">
<widget class="QLabel" name="label_7">
<property name="text">
<string>Dataset loaded</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QRadioButton" name="radio_dataset">
<property name="text">
<string/>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_8">
<property name="text">
<string>I/O Service</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QRadioButton" name="radio_io">
<property name="text">
<string/>
</property>
</widget>
</item>
<item row="1" column="2">
<widget class="QLineEdit" name="lineEdit_2"/>
</item>
<item row="2" column="0">
<widget class="QLabel" name="label_10">
<property name="text">
<string>Reco Service</string>
</property>
</widget>
</item>
<item row="2" column="1">
<widget class="QRadioButton" name="radioButton_4">
<property name="text">
<string/>
</property>
</widget>
</item>
<item row="2" column="2">
<widget class="QLabel" name="label_11">
<property name="text">
<string>reco-adress</string>
</property>
</widget>
</item>
<item row="3" column="0">
<widget class="QLabel" name="label_9">
<property name="text">
<string>Streaming Service</string>
</property>
</widget>
</item>
<item row="3" column="1">
<widget class="QRadioButton" name="radioButton_3">
<property name="text">
<string/>
</property>
</widget>
</item>
<item row="3" column="2">
<widget class="QLabel" name="label_12">
<property name="text">
<string>stream-adress</string>
</property>
</widget>
</item>
</layout>
</widget>
</widget>
<widget class="QDockWidget" name="dock_data">
<property name="windowTitle">
<string>Dataset</string>
</property>
<attribute name="dockWidgetArea">
<number>1</number>
</attribute>
<widget class="QWidget" name="dockWidgetContents_2">
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QLabel" name="label_2">
<property name="text">
<string>Input Dataset (full HDF5 path)</string>
</property>
<property name="alignment">
<set>Qt::AlignmentFlag::AlignCenter</set>
</property>
</widget>
</item>
<item>
<widget class="QLineEdit" name="line_edit_dataset"/>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout">
<item>
<widget class="QPushButton" name="button_load">
<property name="text">
<string>Load Dataset</string>
</property>
</widget>
</item>
<item>
<widget class="QPushButton" name="button_close">
<property name="text">
<string>Close Dataset</string>
</property>
</widget>
</item>
</layout>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout_2">
<item>
<widget class="QLabel" name="label">
<property name="text">
<string>Single Slice Live Preview</string>
</property>
</widget>
</item>
<item>
<widget class="ToggleSwitch" name="toggle_preview"/>
</item>
</layout>
</item>
<item>
<widget class="QLabel" name="label_3">
<property name="text">
<string>Select Slice</string>
</property>
</widget>
</item>
<item>
<widget class="QSlider" name="slider_select">
<property name="maximum">
<number>500</number>
</property>
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
</widget>
</item>
<item>
<widget class="QLabel" name="label_4">
<property name="text">
<string>Current Index</string>
</property>
</widget>
</item>
<item>
<widget class="QLabel" name="label_5">
<property name="text">
<string>Rotation center (offset from center)</string>
</property>
</widget>
</item>
<item>
<widget class="QSlider" name="slider_rotation">
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
</widget>
</item>
<item>
<widget class="QLabel" name="label_6">
<property name="text">
<string>Rotation Index</string>
</property>
</widget>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout_3">
<item>
<widget class="QPushButton" name="button_reconstruct">
<property name="text">
<string>Reconstruct</string>
</property>
</widget>
</item>
<item>
<widget class="QPushButton" name="button_single">
<property name="text">
<string>Single Slice</string>
</property>
</widget>
</item>
</layout>
</item>
<item>
<widget class="QPushButton" name="button_async">
<property name="text">
<string>Sample Async task</string>
</property>
</widget>
</item>
</layout>
</widget>
</widget>
</widget>
<customwidgets>
<customwidget>
<class>BECImageWidget</class>
<extends>QWidget</extends>
<header>bec_image_widget</header>
</customwidget>
<customwidget>
<class>ToggleSwitch</class>
<extends>QWidget</extends>
<header>toggle_switch</header>
</customwidget>
</customwidgets>
<resources/>
<connections>
<connection>
<sender>slider_select</sender>
<signal>valueChanged(int)</signal>
<receiver>label_4</receiver>
<slot>setNum(int)</slot>
<hints>
<hint type="sourcelabel">
<x>61</x>
<y>396</y>
</hint>
<hint type="destinationlabel">
<x>73</x>
<y>425</y>
</hint>
</hints>
</connection>
</connections>
</ui>

View File

@@ -27,25 +27,17 @@ class AutoUpdates:
def __init__(self, gui: BECDockArea):
self.gui = gui
self.msg_queue = Queue()
self.auto_update_thread = None
self._shutdown_sentinel = object()
self.start()
def start(self):
"""
Start the auto update thread.
"""
self.auto_update_thread = threading.Thread(target=self.process_queue)
self.auto_update_thread.start()
self._default_dock = None
self._default_fig = None
def start_default_dock(self):
"""
Create a default dock for the auto updates.
"""
dock = self.gui.add_dock("default_figure")
dock.add_widget("BECFigure")
self.dock_name = "default_figure"
self._default_dock = self.gui.add_dock(self.dock_name)
self._default_dock.add_widget("BECFigure")
self._default_fig = self._default_dock.widget_list[0]
@staticmethod
def get_scan_info(msg) -> ScanInfo:
@@ -73,15 +65,9 @@ class AutoUpdates:
"""
Get the default figure from the GUI.
"""
dock = self.gui.panels.get(self.dock_name, [])
if not dock:
return None
widgets = dock.widget_list
if not widgets:
return None
return widgets[0]
return self._default_fig
def run(self, msg):
def do_update(self, msg):
"""
Run the update function if enabled.
"""
@@ -90,20 +76,9 @@ class AutoUpdates:
if msg.status != "open":
return
info = self.get_scan_info(msg)
self.handler(info)
return self.handler(info)
def process_queue(self):
"""
Process the message queue.
"""
while True:
msg = self.msg_queue.get()
if msg is self._shutdown_sentinel:
break
self.run(msg)
@staticmethod
def get_selected_device(monitored_devices, selected_device):
def get_selected_device(self, monitored_devices, selected_device):
"""
Get the selected device for the plot. If no device is selected, the first
device in the monitored devices list is selected.
@@ -120,14 +95,11 @@ class AutoUpdates:
Default update function.
"""
if info.scan_name == "line_scan" and info.scan_report_devices:
self.simple_line_scan(info)
return
return self.simple_line_scan(info)
if info.scan_name == "grid_scan" and info.scan_report_devices:
self.simple_grid_scan(info)
return
return self.simple_grid_scan(info)
if info.scan_report_devices:
self.best_effort(info)
return
return self.best_effort(info)
def simple_line_scan(self, info: ScanInfo) -> None:
"""
@@ -137,12 +109,19 @@ class AutoUpdates:
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = self.get_selected_device(info.monitored_devices, self.gui.selected_device)
selected_device = yield self.gui.selected_device
dev_y = self.get_selected_device(info.monitored_devices, selected_device)
if not dev_y:
return
fig.clear_all()
plt = fig.plot(x_name=dev_x, y_name=dev_y, label=f"Scan {info.scan_number} - {dev_y}")
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
yield fig.clear_all()
yield fig.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
def simple_grid_scan(self, info: ScanInfo) -> None:
"""
@@ -153,12 +132,18 @@ class AutoUpdates:
return
dev_x = info.scan_report_devices[0]
dev_y = info.scan_report_devices[1]
dev_z = self.get_selected_device(info.monitored_devices, self.gui.selected_device)
fig.clear_all()
plt = fig.plot(
x_name=dev_x, y_name=dev_y, z_name=dev_z, label=f"Scan {info.scan_number} - {dev_z}"
selected_device = yield self.gui.selected_device
dev_z = self.get_selected_device(info.monitored_devices, selected_device)
yield fig.clear_all()
yield fig.plot(
x_name=dev_x,
y_name=dev_y,
z_name=dev_z,
label=f"Scan {info.scan_number} - {dev_z}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def best_effort(self, info: ScanInfo) -> None:
"""
@@ -168,17 +153,16 @@ class AutoUpdates:
if not fig:
return
dev_x = info.scan_report_devices[0]
dev_y = self.get_selected_device(info.monitored_devices, self.gui.selected_device)
selected_device = yield self.gui.selected_device
dev_y = self.get_selected_device(info.monitored_devices, selected_device)
if not dev_y:
return
fig.clear_all()
plt = fig.plot(x_name=dev_x, y_name=dev_y, label=f"Scan {info.scan_number} - {dev_y}")
plt.set(title=f"Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def shutdown(self):
"""
Shutdown the auto update thread.
"""
self.msg_queue.put(self._shutdown_sentinel)
if self.auto_update_thread:
self.auto_update_thread.join()
yield fig.clear_all()
yield fig.plot(
x_name=dev_x,
y_name=dev_y,
label=f"Scan {info.scan_number} - {dev_y}",
title=f"Scan {info.scan_number}",
x_label=dev_x,
y_label=dev_y,
)

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
import enum
from typing import Literal, Optional, overload
from bec_widgets.cli.client_utils import BECGuiClientMixin, RPCBase, rpc_call
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
# pylint: skip-file
@@ -342,7 +342,7 @@ class BECDock(RPCBase):
"""
class BECDockArea(RPCBase, BECGuiClientMixin):
class BECDockArea(RPCBase):
@property
@rpc_call
def _config_dict(self) -> "dict":
@@ -353,6 +353,13 @@ class BECDockArea(RPCBase, BECGuiClientMixin):
dict: The configuration of the widget.
"""
@property
@rpc_call
def selected_device(self) -> "str":
"""
None
"""
@property
@rpc_call
def panels(self) -> "dict[str, BECDock]":
@@ -480,6 +487,12 @@ class BECDockArea(RPCBase, BECGuiClientMixin):
Hide all windows including floating docks.
"""
@rpc_call
def delete(self):
"""
None
"""
class BECFigure(RPCBase):
@property

View File

@@ -7,61 +7,33 @@ import os
import select
import subprocess
import threading
import time
import uuid
from functools import wraps
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TYPE_CHECKING
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import isinstance_based_on_class_name, lazy_import, lazy_import_from
import bec_widgets.cli.client as client
from bec_widgets.cli.auto_updates import AutoUpdates
from bec_widgets.cli.rpc.rpc_base import RPCBase
if TYPE_CHECKING:
from bec_lib import messages
from bec_lib.connector import MessageObject
from bec_lib.device import DeviceBase
messages = lazy_import("bec_lib.messages")
# from bec_lib.connector import MessageObject
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispatcher",))
from bec_widgets.utils.bec_dispatcher import BECDispatcher
else:
messages = lazy_import("bec_lib.messages")
# from bec_lib.connector import MessageObject
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispatcher",))
logger = bec_logger.logger
def rpc_call(func):
"""
A decorator for calling a function on the server.
Args:
func: The function to call.
Returns:
The result of the function call.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
# we could rely on a strict type check here, but this is more flexible
# moreover, it would anyway crash for objects...
out = []
for arg in args:
if hasattr(arg, "name"):
arg = arg.name
out.append(arg)
args = tuple(out)
for key, val in kwargs.items():
if hasattr(val, "name"):
kwargs[key] = val.name
if not self.gui_is_alive():
raise RuntimeError("GUI is not alive")
return self._run_rpc(func.__name__, *args, **kwargs)
return wrapper
def _get_output(process, logger) -> None:
log_func = {process.stdout: logger.debug, process.stderr: logger.error}
stream_buffer = {process.stdout: [], process.stderr: []}
@@ -132,29 +104,79 @@ class RepeatTimer(threading.Timer):
self.function(*self.args, **self.kwargs)
class BECGuiClientMixin:
@contextmanager
def wait_for_server(client):
timeout = client._startup_timeout
if not timeout:
if client.gui_is_alive():
# there is hope, let's wait a bit
timeout = 1
else:
raise RuntimeError("GUI is not alive")
try:
if client._gui_started_event.wait(timeout=timeout):
client._gui_started_timer.cancel()
client._gui_started_timer.join()
else:
raise TimeoutError("Could not connect to GUI server")
finally:
# after initial waiting period, do not wait so much any more
# (only relevant if GUI didn't start)
client._startup_timeout = 0
yield
### ----------------------------
### NOTE
### it is far easier to extend the 'delete' method on the client side,
### to know when the client is deleted, rather than listening to server
### to get notified. However, 'generate_cli.py' cannot add extra stuff
### in the generated client module. So, here a class with the same name
### is created, and client module is patched.
class BECDockArea(client.BECDockArea):
def delete(self):
if self is BECGuiClient._top_level["main"].widget:
raise RuntimeError("Cannot delete main window")
super().delete()
try:
del BECGuiClient._top_level[self._gui_id]
except KeyError:
# if a dock area is not at top level
pass
client.BECDockArea = BECDockArea
### ----------------------------
@dataclass
class WidgetDesc:
title: str
widget: BECDockArea
class BECGuiClient(RPCBase):
_top_level = {}
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._auto_updates_enabled = True
self._auto_updates = None
self._startup_timeout = 0
self._gui_started_timer = None
self._gui_started_event = threading.Event()
self._process = None
self._process_output_processing_thread = None
self._target_endpoint = MessageEndpoints.scan_status()
self._selected_device = None
@property
def windows(self):
return self._top_level
@property
def auto_updates(self):
if self._auto_updates_enabled:
self._gui_started_event.wait()
return self._auto_updates
def shutdown_auto_updates(self):
if self._auto_updates_enabled:
if self._auto_updates is not None:
self._auto_updates.shutdown()
self._auto_updates = None
with wait_for_server(self):
return self._auto_updates
def _get_update_script(self) -> AutoUpdates | None:
eps = imd.entry_points(group="bec.widgets.auto_updates")
@@ -175,49 +197,59 @@ class BECGuiClientMixin:
"""
Selected device for the plot.
"""
return self._selected_device
auto_update_config_ep = MessageEndpoints.gui_auto_update_config(self._gui_id)
auto_update_config = self._client.connector.get(auto_update_config_ep)
if auto_update_config:
return auto_update_config.selected_device
return None
@selected_device.setter
def selected_device(self, device: str | DeviceBase):
if isinstance_based_on_class_name(device, "bec_lib.device.DeviceBase"):
self._selected_device = device.name
self._client.connector.set_and_publish(
MessageEndpoints.gui_auto_update_config(self._gui_id),
messages.GUIAutoUpdateConfigMessage(selected_device=device.name),
)
elif isinstance(device, str):
self._selected_device = device
self._client.connector.set_and_publish(
MessageEndpoints.gui_auto_update_config(self._gui_id),
messages.GUIAutoUpdateConfigMessage(selected_device=device),
)
else:
raise ValueError("Device must be a string or a device object")
def _start_update_script(self) -> None:
self._client.connector.register(
self._target_endpoint, cb=self._handle_msg_update, parent=self
)
self._client.connector.register(MessageEndpoints.scan_status(), cb=self._handle_msg_update)
@staticmethod
def _handle_msg_update(msg: MessageObject, parent: BECGuiClientMixin) -> None:
if parent.auto_updates is not None:
def _handle_msg_update(self, msg: MessageObject) -> None:
if self.auto_updates is not None:
# pylint: disable=protected-access
parent._update_script_msg_parser(msg.value)
return self._update_script_msg_parser(msg.value)
def _update_script_msg_parser(self, msg: messages.BECMessage) -> None:
if isinstance(msg, messages.ScanStatusMessage):
if not self.gui_is_alive():
return
if self._auto_updates_enabled:
self.auto_updates.msg_queue.put(msg)
return self.auto_updates.do_update(msg)
def _gui_post_startup(self):
self._top_level["main"] = WidgetDesc(
title="BEC Widgets", widget=BECDockArea(gui_id=self._gui_id)
)
if self._auto_updates_enabled:
if self._auto_updates is None:
auto_updates = self._get_update_script()
if auto_updates is None:
AutoUpdates.create_default_dock = True
AutoUpdates.enabled = True
auto_updates = AutoUpdates(gui=self)
auto_updates = AutoUpdates(self._top_level["main"].widget)
if auto_updates.create_default_dock:
auto_updates.start_default_dock()
# fig = auto_updates.get_default_figure()
self._start_update_script()
self._auto_updates = auto_updates
self._do_show_all()
self._gui_started_event.set()
self.show_all()
def start_server(self, wait=False) -> None:
"""
@@ -225,8 +257,8 @@ class BECGuiClientMixin:
"""
if self._process is None or self._process.poll() is not None:
logger.success("GUI starting...")
self._startup_timeout = 5
self._gui_started_event.clear()
self._start_update_script()
self._process, self._process_output_processing_thread = _start_plot_process(
self._gui_id, self.__class__, self._client._service_config.config, logger=logger
)
@@ -239,27 +271,66 @@ class BECGuiClientMixin:
threading.current_thread().cancel()
self._gui_started_timer = RepeatTimer(
1, lambda: self.gui_is_alive() and gui_started_callback(self._gui_post_startup)
0.5, lambda: self.gui_is_alive() and gui_started_callback(self._gui_post_startup)
)
self._gui_started_timer.start()
if wait:
self._gui_started_event.wait()
def show_all(self):
self._gui_started_event.wait()
def _dump(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
return rpc_client._run_rpc("_dump")
def start(self):
return self.start_server()
def _do_show_all(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client._run_rpc("show")
for window in self._top_level.values():
window.widget.show()
def show_all(self):
with wait_for_server(self):
return self._do_show_all()
def hide_all(self):
self._gui_started_event.wait()
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client._run_rpc("hide")
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client._run_rpc("hide")
for window in self._top_level.values():
window.widget.hide()
def show(self):
if self._process is not None:
return self.show_all()
# backward compatibility: show() was also starting server
return self.start_server(wait=True)
def hide(self):
return self.hide_all()
@property
def main(self):
"""Return client to main dock area (in main window)"""
with wait_for_server(self):
return self._top_level["main"].widget
def new(self, title):
"""Ask main window to create a new top-level dock area"""
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
widget = rpc_client._run_rpc("new_dock_area", title)
self._top_level[widget._gui_id] = WidgetDesc(title=title, widget=widget)
return widget
def close(self) -> None:
"""
Close the gui window.
"""
self._top_level.clear()
if self._gui_started_timer is not None:
self._gui_started_timer.cancel()
self._gui_started_timer.join()
@@ -274,130 +345,3 @@ class BECGuiClientMixin:
self._process_output_processing_thread.join()
self._process.wait()
self._process = None
self.shutdown_auto_updates()
class RPCResponseTimeoutError(Exception):
"""Exception raised when an RPC response is not received within the expected time."""
def __init__(self, request_id, timeout):
super().__init__(
f"RPC response not received within {timeout} seconds for request ID {request_id}"
)
class RPCBase:
def __init__(self, gui_id: str = None, config: dict = None, parent=None) -> None:
self._client = BECClient() # BECClient is a singleton; here, we simply get the instance
self._config = config if config is not None else {}
self._gui_id = gui_id if gui_id is not None else str(uuid.uuid4())[:5]
self._parent = parent
self._msg_wait_event = threading.Event()
self._rpc_response = None
super().__init__()
# print(f"RPCBase: {self._gui_id}")
def __repr__(self):
type_ = type(self)
qualname = type_.__qualname__
return f"<{qualname} object at {hex(id(self))}>"
@property
def _root(self):
"""
Get the root widget. This is the BECFigure widget that holds
the anchor gui_id.
"""
parent = self
# pylint: disable=protected-access
while parent._parent is not None:
parent = parent._parent
return parent
def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=3, **kwargs):
"""
Run the RPC call.
Args:
method: The method to call.
args: The arguments to pass to the method.
wait_for_rpc_response: Whether to wait for the RPC response.
kwargs: The keyword arguments to pass to the method.
Returns:
The result of the RPC call.
"""
request_id = str(uuid.uuid4())
rpc_msg = messages.GUIInstructionMessage(
action=method,
parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id},
metadata={"request_id": request_id},
)
# pylint: disable=protected-access
receiver = self._root._gui_id
if wait_for_rpc_response:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(10)
if not finished:
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
self._client.connector.unregister(
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
)
# get class name
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
@staticmethod
def _on_rpc_response(msg: MessageObject, parent: RPCBase) -> None:
msg = msg.value
parent._msg_wait_event.set()
parent._rpc_response = msg
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
return None
if isinstance(msg_result, list):
return [self._create_widget_from_msg_result(res) for res in msg_result]
if isinstance(msg_result, dict):
if "__rpc__" not in msg_result:
return {
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
}
cls = msg_result.pop("widget_class", None)
msg_result.pop("__rpc__", None)
if not cls:
return msg_result
cls = getattr(client, cls)
# print(msg_result)
return cls(parent=self, **msg_result)
return msg_result
def gui_is_alive(self):
"""
Check if the GUI is alive.
"""
heart = self._client.connector.get(MessageEndpoints.gui_heartbeat(self._root._gui_id))
if heart is None:
return False
if heart.status == messages.BECStatus.RUNNING:
return True
return False

View File

@@ -35,7 +35,7 @@ from __future__ import annotations
import enum
from typing import Literal, Optional, overload
from bec_widgets.cli.client_utils import RPCBase, rpc_call, BECGuiClientMixin
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
# pylint: skip-file"""
@@ -84,7 +84,7 @@ class Widgets(str, enum.Enum):
# Generate the content
if cls.__name__ == "BECDockArea":
self.content += f"""
class {class_name}(RPCBase, BECGuiClientMixin):"""
class {class_name}(RPCBase):"""
else:
self.content += f"""
class {class_name}(RPCBase):"""

View File

View File

@@ -0,0 +1,177 @@
from __future__ import annotations
import threading
import uuid
from functools import wraps
from typing import TYPE_CHECKING
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
import bec_widgets.cli.client as client
if TYPE_CHECKING:
from bec_lib import messages
from bec_lib.connector import MessageObject
else:
messages = lazy_import("bec_lib.messages")
# from bec_lib.connector import MessageObject
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
def rpc_call(func):
"""
A decorator for calling a function on the server.
Args:
func: The function to call.
Returns:
The result of the function call.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
# we could rely on a strict type check here, but this is more flexible
# moreover, it would anyway crash for objects...
out = []
for arg in args:
if hasattr(arg, "name"):
arg = arg.name
out.append(arg)
args = tuple(out)
for key, val in kwargs.items():
if hasattr(val, "name"):
kwargs[key] = val.name
if not self.gui_is_alive():
raise RuntimeError("GUI is not alive")
return self._run_rpc(func.__name__, *args, **kwargs)
return wrapper
class RPCResponseTimeoutError(Exception):
"""Exception raised when an RPC response is not received within the expected time."""
def __init__(self, request_id, timeout):
super().__init__(
f"RPC response not received within {timeout} seconds for request ID {request_id}"
)
class RPCBase:
def __init__(self, gui_id: str = None, config: dict = None, parent=None) -> None:
self._client = BECClient() # BECClient is a singleton; here, we simply get the instance
self._config = config if config is not None else {}
self._gui_id = gui_id if gui_id is not None else str(uuid.uuid4())[:5]
self._parent = parent
self._msg_wait_event = threading.Event()
self._rpc_response = None
super().__init__()
# print(f"RPCBase: {self._gui_id}")
def __repr__(self):
type_ = type(self)
qualname = type_.__qualname__
return f"<{qualname} object at {hex(id(self))}>"
@property
def _root(self):
"""
Get the root widget. This is the BECFigure widget that holds
the anchor gui_id.
"""
parent = self
# pylint: disable=protected-access
while parent._parent is not None:
parent = parent._parent
return parent
def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=3, **kwargs):
"""
Run the RPC call.
Args:
method: The method to call.
args: The arguments to pass to the method.
wait_for_rpc_response: Whether to wait for the RPC response.
kwargs: The keyword arguments to pass to the method.
Returns:
The result of the RPC call.
"""
request_id = str(uuid.uuid4())
rpc_msg = messages.GUIInstructionMessage(
action=method,
parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id},
metadata={"request_id": request_id},
)
# pylint: disable=protected-access
receiver = self._root._gui_id
if wait_for_rpc_response:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(timeout)
if not finished:
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
self._client.connector.unregister(
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
)
# get class name
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
@staticmethod
def _on_rpc_response(msg: MessageObject, parent: RPCBase) -> None:
msg = msg.value
parent._msg_wait_event.set()
parent._rpc_response = msg
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
return None
if isinstance(msg_result, list):
return [self._create_widget_from_msg_result(res) for res in msg_result]
if isinstance(msg_result, dict):
if "__rpc__" not in msg_result:
return {
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
}
cls = msg_result.pop("widget_class", None)
msg_result.pop("__rpc__", None)
if not cls:
return msg_result
cls = getattr(client, cls)
# print(msg_result)
return cls(parent=self, **msg_result)
return msg_result
def gui_is_alive(self):
"""
Check if the GUI is alive.
"""
heart = self._client.connector.get(MessageEndpoints.gui_heartbeat(self._root._gui_id))
if heart is None:
return False
if heart.status == messages.BECStatus.RUNNING:
return True
return False

View File

@@ -1,9 +1,11 @@
from __future__ import annotations
import functools
import json
import signal
import sys
from contextlib import redirect_stderr, redirect_stdout
import types
from contextlib import contextmanager, redirect_stderr, redirect_stdout
from typing import Union
from bec_lib.endpoints import MessageEndpoints
@@ -12,7 +14,8 @@ from bec_lib.service_config import ServiceConfig
from bec_lib.utils.import_utils import lazy_import
from qtpy.QtCore import Qt, QTimer
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.qt_utils.error_popups import ErrorPopupUtility
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.widgets.containers.dock import BECDockArea
@@ -23,6 +26,27 @@ messages = lazy_import("bec_lib.messages")
logger = bec_logger.logger
@contextmanager
def rpc_exception_hook(err_func):
"""This context replaces the popup message box for error display with a specific hook"""
# get error popup utility singleton
popup = ErrorPopupUtility()
# save current setting
old_exception_hook = popup.custom_exception_hook
# install err_func, if it is a callable
def custom_exception_hook(self, exc_type, value, tb, **kwargs):
err_func({"error": popup.get_error_message(exc_type, value, tb)})
popup.custom_exception_hook = types.MethodType(custom_exception_hook, popup)
try:
yield popup
finally:
# restore state of error popup utility singleton
popup.custom_exception_hook = old_exception_hook
class BECWidgetsCLIServer:
def __init__(
@@ -57,18 +81,19 @@ class BECWidgetsCLIServer:
def on_rpc_update(self, msg: dict, metadata: dict):
request_id = metadata.get("request_id")
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
try:
obj = self.get_object_from_config(msg["parameter"])
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
res = self.run_rpc(obj, method, args, kwargs)
except Exception as e:
logger.error(f"Error while executing RPC instruction: {e}")
self.send_response(request_id, False, {"error": str(e)})
else:
logger.debug(f"RPC instruction executed successfully: {res}")
self.send_response(request_id, True, {"result": res})
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
try:
obj = self.get_object_from_config(msg["parameter"])
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
res = self.run_rpc(obj, method, args, kwargs)
except Exception as e:
logger.error(f"Error while executing RPC instruction: {e}")
self.send_response(request_id, False, {"error": str(e)})
else:
logger.debug(f"RPC instruction executed successfully: {res}")
self.send_response(request_id, True, {"result": res})
def send_response(self, request_id: str, accepted: bool, msg: dict):
self.client.connector.set_and_publish(
@@ -181,14 +206,8 @@ def main():
import bec_widgets
bec_logger.level = bec_logger.LOGLEVEL.DEBUG
if __name__ != "__main__":
# if not running as main, set the log level to critical
# pylint: disable=protected-access
bec_logger._stderr_log_level = bec_logger.LOGLEVEL.CRITICAL
parser = argparse.ArgumentParser(description="BEC Widgets CLI Server")
parser.add_argument("--id", type=str, help="The id of the server")
parser.add_argument("--id", type=str, default="test", help="The id of the server")
parser.add_argument(
"--gui_class",
type=str,
@@ -199,10 +218,20 @@ def main():
args = parser.parse_args()
if args.gui_class == "BECFigure":
gui_class = BECFigure
elif args.gui_class == "BECDockArea":
if args.hide:
# if we start hidden, it means we are under control of the client
# -> set the log level to critical to not see all the messages
# pylint: disable=protected-access
# bec_logger._stderr_log_level = bec_logger.LOGLEVEL.CRITICAL
bec_logger.level = bec_logger.LOGLEVEL.CRITICAL
else:
# verbose log
bec_logger.level = bec_logger.LOGLEVEL.DEBUG
if args.gui_class == "BECDockArea":
gui_class = BECDockArea
elif args.gui_class == "BECFigure":
gui_class = BECFigure
else:
print(
"Please specify a valid gui_class to run. Use -h for help."
@@ -213,8 +242,10 @@ def main():
with redirect_stdout(SimpleFileLikeFromLogOutputFunc(logger.info)):
with redirect_stderr(SimpleFileLikeFromLogOutputFunc(logger.error)):
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(False)
app.setApplicationName("BEC Figure")
# set close on last window, only if not under control of client ;
# indeed, Qt considers a hidden window a closed window, so if all windows
# are hidden by default it exits
app.setQuitOnLastWindowClosed(not args.hide)
module_path = os.path.dirname(bec_widgets.__file__)
icon = QIcon()
icon.addFile(
@@ -222,6 +253,8 @@ def main():
size=QSize(48, 48),
)
app.setWindowIcon(icon)
# store gui id within QApplication object, to make it available to all widgets
app.gui_id = args.id
server = _start_server(args.id, gui_class, args.config)
@@ -233,7 +266,6 @@ def main():
gui = server.gui
win.setCentralWidget(gui)
win.resize(800, 600)
if not args.hide:
win.show()
@@ -242,6 +274,12 @@ def main():
def sigint_handler(*args):
# display message, for people to let it terminate gracefully
print("Caught SIGINT, exiting")
# first hide all top level windows
# this is to discriminate the cases between "user clicks on [X]"
# (which should be filtered, to not close -see BECDockArea-)
# or "app is asked to close"
for window in app.topLevelWidgets():
window.hide() # so, we know we can exit because it is hidden
app.quit()
signal.signal(signal.SIGINT, sigint_handler)
@@ -250,6 +288,5 @@ def main():
sys.exit(app.exec())
if __name__ == "__main__": # pragma: no cover
sys.argv = ["bec_widgets.cli.server", "--id", "e2860", "--gui_class", "BECDockArea"]
if __name__ == "__main__":
main()

View File

@@ -7,6 +7,7 @@ from qtpy.QtWidgets import (
QApplication,
QGroupBox,
QHBoxLayout,
QPushButton,
QSplitter,
QTabWidget,
QVBoxLayout,
@@ -17,6 +18,7 @@ from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.containers.dock import BECDockArea
from bec_widgets.widgets.containers.figure import BECFigure
from bec_widgets.widgets.containers.layout_manager.layout_manager import LayoutManagerWidget
from bec_widgets.widgets.editors.jupyter_console.jupyter_console import BECJupyterConsole
@@ -50,11 +52,16 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
"d1": self.d1,
"d2": self.d2,
"wave": self.wf,
# "bar": self.bar,
# "cm": self.colormap,
"im": self.im,
"mm": self.mm,
"mw": self.mw,
"lm": self.lm,
"btn1": self.btn1,
"btn2": self.btn2,
"btn3": self.btn3,
"btn4": self.btn4,
"btn5": self.btn5,
"btn6": self.btn6,
}
)
@@ -79,11 +86,25 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
second_tab_layout.addWidget(self.figure)
tab_widget.addTab(second_tab, "BEC Figure")
third_tab = QWidget()
third_tab_layout = QVBoxLayout(third_tab)
self.lm = LayoutManagerWidget()
third_tab_layout.addWidget(self.lm)
tab_widget.addTab(third_tab, "Layout Manager Widget")
group_box = QGroupBox("Jupyter Console", splitter)
group_box_layout = QVBoxLayout(group_box)
self.console = BECJupyterConsole(inprocess=True)
group_box_layout.addWidget(self.console)
# Some buttons for layout testing
self.btn1 = QPushButton("Button 1")
self.btn2 = QPushButton("Button 2")
self.btn3 = QPushButton("Button 3")
self.btn4 = QPushButton("Button 4")
self.btn5 = QPushButton("Button 5")
self.btn6 = QPushButton("Button 6")
# add stuff to figure
self._init_figure()
@@ -93,15 +114,7 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
self.setWindowTitle("Jupyter Console Window")
def _init_figure(self):
self.w1 = self.figure.plot(
x_name="samx",
y_name="bpm4i",
# title="Standard Plot with sync device, custom labels - w1",
# x_label="Motor Position",
# y_label="Intensity (A.U.)",
row=0,
col=0,
)
self.w1 = self.figure.plot(x_name="samx", y_name="bpm4i", row=0, col=0)
self.w1.set(
title="Standard Plot with sync device, custom labels - w1",
x_label="Motor Position",
@@ -169,14 +182,6 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
self.wf = self.d2.add_widget("BECFigure", row=0, col=0)
self.mw = self.wf.multi_waveform(monitor="waveform") # , config=config)
# self.wf.plot(x_name="samx", y_name="bpm3a")
# self.wf.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
# self.bar = self.d2.add_widget("RingProgressBar", row=0, col=1)
# self.bar.set_diameter(200)
# self.d3 = self.dock.add_dock(name="dock_3", position="bottom")
# self.colormap = pg.GradientWidget()
# self.d3.add_widget(self.colormap, row=0, col=0)
self.dock.save_state()

View File

@@ -0,0 +1,380 @@
from __future__ import annotations
import sys
from typing import Literal
import pyqtgraph as pg
from qtpy.QtCore import Property, QEasingCurve, QObject, QPropertyAnimation
from qtpy.QtWidgets import (
QApplication,
QHBoxLayout,
QMainWindow,
QPushButton,
QSizePolicy,
QVBoxLayout,
QWidget,
)
from typeguard import typechecked
from bec_widgets.widgets.containers.layout_manager.layout_manager import LayoutManagerWidget
class DimensionAnimator(QObject):
"""
Helper class to animate the size of a panel widget.
"""
def __init__(self, panel_widget: QWidget, direction: str):
super().__init__()
self.panel_widget = panel_widget
self.direction = direction
self._size = 0
@Property(int)
def panel_width(self):
"""
Returns the current width of the panel widget.
"""
return self._size
@panel_width.setter
def panel_width(self, val: int):
"""
Set the width of the panel widget.
Args:
val(int): The width to set.
"""
self._size = val
self.panel_widget.setFixedWidth(val)
@Property(int)
def panel_height(self):
"""
Returns the current height of the panel widget.
"""
return self._size
@panel_height.setter
def panel_height(self, val: int):
"""
Set the height of the panel widget.
Args:
val(int): The height to set.
"""
self._size = val
self.panel_widget.setFixedHeight(val)
class CollapsiblePanelManager(QObject):
"""
Manager class to handle collapsible panels from a main widget using LayoutManagerWidget.
"""
def __init__(self, layout_manager: LayoutManagerWidget, reference_widget: QWidget, parent=None):
super().__init__(parent)
self.layout_manager = layout_manager
self.reference_widget = reference_widget
self.animations = {}
self.panels = {}
self.direction_settings = {
"left": {"property": b"maximumWidth", "default_size": 200},
"right": {"property": b"maximumWidth", "default_size": 200},
"top": {"property": b"maximumHeight", "default_size": 150},
"bottom": {"property": b"maximumHeight", "default_size": 150},
}
def add_panel(
self,
direction: Literal["left", "right", "top", "bottom"],
panel_widget: QWidget,
target_size: int | None = None,
duration: int = 300,
):
"""
Add a panel widget to the layout manager.
Args:
direction(Literal["left", "right", "top", "bottom"]): Direction of the panel.
panel_widget(QWidget): The panel widget to add.
target_size(int, optional): The target size of the panel. Defaults to None.
duration(int): The duration of the animation in milliseconds. Defaults to 300.
"""
if direction not in self.direction_settings:
raise ValueError("Direction must be one of 'left', 'right', 'top', 'bottom'.")
if target_size is None:
target_size = self.direction_settings[direction]["default_size"]
self.layout_manager.add_widget_relative(
widget=panel_widget, reference_widget=self.reference_widget, position=direction
)
panel_widget.setVisible(False)
# Set initial constraints as flexible
if direction in ["left", "right"]:
panel_widget.setMaximumWidth(0)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
else:
panel_widget.setMaximumHeight(0)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.panels[direction] = {
"widget": panel_widget,
"direction": direction,
"target_size": target_size,
"duration": duration,
"animator": None,
}
def toggle_panel(
self,
direction: Literal["left", "right", "top", "bottom"],
target_size: int | None = None,
duration: int | None = None,
easing_curve: QEasingCurve = QEasingCurve.InOutQuad,
ensure_max: bool = False,
scale: float | None = None,
animation: bool = True,
):
"""
Toggle the specified panel.
Parameters:
direction (Literal["left", "right", "top", "bottom"]): Direction of the panel to toggle.
target_size (int, optional): Override target size for this toggle.
duration (int, optional): Override the animation duration.
easing_curve (QEasingCurve): Animation easing curve.
ensure_max (bool): If True, animate as a fixed-size panel.
scale (float, optional): If provided, calculate target_size from main widget size.
animation (bool): If False, no animation is performed; panel instantly toggles.
"""
if direction not in self.panels:
raise ValueError(f"No panel found in direction '{direction}'.")
panel_info = self.panels[direction]
panel_widget = panel_info["widget"]
dir_settings = self.direction_settings[direction]
# Determine final target size
if scale is not None:
main_rect = self.reference_widget.geometry()
if direction in ["left", "right"]:
computed_target = int(main_rect.width() * scale)
else:
computed_target = int(main_rect.height() * scale)
final_target_size = computed_target
else:
if target_size is None:
final_target_size = panel_info["target_size"]
else:
final_target_size = target_size
if duration is None:
duration = panel_info["duration"]
expanding_property = dir_settings["property"]
currently_visible = panel_widget.isVisible()
if ensure_max:
if panel_info["animator"] is None:
panel_info["animator"] = DimensionAnimator(panel_widget, direction)
animator = panel_info["animator"]
if direction in ["left", "right"]:
prop_name = b"panel_width"
else:
prop_name = b"panel_height"
else:
animator = None
prop_name = expanding_property
if currently_visible:
# Hide the panel
if ensure_max:
start_value = final_target_size
end_value = 0
finish_callback = lambda w=panel_widget, d=direction: self._after_hide_reset(w, d)
else:
start_value = (
panel_widget.width()
if direction in ["left", "right"]
else panel_widget.height()
)
end_value = 0
finish_callback = lambda w=panel_widget: w.setVisible(False)
else:
# Show the panel
start_value = 0
end_value = final_target_size
finish_callback = None
if ensure_max:
# Fix panel exactly
if direction in ["left", "right"]:
panel_widget.setMinimumWidth(0)
panel_widget.setMaximumWidth(final_target_size)
panel_widget.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
else:
panel_widget.setMinimumHeight(0)
panel_widget.setMaximumHeight(final_target_size)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
else:
# Flexible mode
if direction in ["left", "right"]:
panel_widget.setMinimumWidth(0)
panel_widget.setMaximumWidth(final_target_size)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
else:
panel_widget.setMinimumHeight(0)
panel_widget.setMaximumHeight(final_target_size)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
panel_widget.setVisible(True)
if not animation:
# No animation: instantly set final state
if end_value == 0:
# Hiding
if ensure_max:
# Reset after hide
self._after_hide_reset(panel_widget, direction)
else:
panel_widget.setVisible(False)
else:
# Showing
if ensure_max:
# Already set fixed size
if direction in ["left", "right"]:
panel_widget.setFixedWidth(end_value)
else:
panel_widget.setFixedHeight(end_value)
else:
# Just set maximum dimension
if direction in ["left", "right"]:
panel_widget.setMaximumWidth(end_value)
else:
panel_widget.setMaximumHeight(end_value)
return
# With animation
animation = QPropertyAnimation(animator if ensure_max else panel_widget, prop_name)
animation.setDuration(duration)
animation.setStartValue(start_value)
animation.setEndValue(end_value)
animation.setEasingCurve(easing_curve)
if end_value == 0 and finish_callback:
animation.finished.connect(finish_callback)
elif end_value == 0 and not finish_callback:
animation.finished.connect(lambda w=panel_widget: w.setVisible(False))
animation.start()
self.animations[panel_widget] = animation
@typechecked
def _after_hide_reset(
self, panel_widget: QWidget, direction: Literal["left", "right", "top", "bottom"]
):
"""
Reset the panel widget after hiding it in ensure_max mode.
Args:
panel_widget(QWidget): The panel widget to reset.
direction(Literal["left", "right", "top", "bottom"]): The direction of the panel.
"""
# Called after hiding a panel in ensure_max mode
panel_widget.setVisible(False)
if direction in ["left", "right"]:
panel_widget.setMinimumWidth(0)
panel_widget.setMaximumWidth(0)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
else:
panel_widget.setMinimumHeight(0)
panel_widget.setMaximumHeight(16777215)
panel_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
####################################################################################################
# The following code is for the GUI control panel to interact with the CollapsiblePanelManager.
# It is not covered by any tests as it serves only as an example for the CollapsiblePanelManager class.
####################################################################################################
class MainWindow(QMainWindow): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Panels with ensure_max, scale, and animation toggle")
self.resize(800, 600)
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
main_layout.setContentsMargins(10, 10, 10, 10)
main_layout.setSpacing(10)
# Buttons
buttons_layout = QHBoxLayout()
self.btn_left = QPushButton("Toggle Left (ensure_max=True)")
self.btn_top = QPushButton("Toggle Top (scale=0.5, no animation)")
self.btn_right = QPushButton("Toggle Right (ensure_max=True, scale=0.3)")
self.btn_bottom = QPushButton("Toggle Bottom (no animation)")
buttons_layout.addWidget(self.btn_left)
buttons_layout.addWidget(self.btn_top)
buttons_layout.addWidget(self.btn_right)
buttons_layout.addWidget(self.btn_bottom)
main_layout.addLayout(buttons_layout)
self.layout_manager = LayoutManagerWidget()
main_layout.addWidget(self.layout_manager)
# Main widget
self.main_plot = pg.PlotWidget()
self.main_plot.plot([1, 2, 3, 4], [4, 3, 2, 1])
self.layout_manager.add_widget(self.main_plot, 0, 0)
self.panel_manager = CollapsiblePanelManager(self.layout_manager, self.main_plot)
# Panels
self.left_panel = pg.PlotWidget()
self.left_panel.plot([1, 2, 3], [3, 2, 1])
self.panel_manager.add_panel("left", self.left_panel, target_size=200)
self.right_panel = pg.PlotWidget()
self.right_panel.plot([10, 20, 30], [1, 10, 1])
self.panel_manager.add_panel("right", self.right_panel, target_size=200)
self.top_panel = pg.PlotWidget()
self.top_panel.plot([1, 2, 3], [1, 2, 3])
self.panel_manager.add_panel("top", self.top_panel, target_size=150)
self.bottom_panel = pg.PlotWidget()
self.bottom_panel.plot([2, 4, 6], [10, 5, 10])
self.panel_manager.add_panel("bottom", self.bottom_panel, target_size=150)
# Connect buttons
# Left with ensure_max
self.btn_left.clicked.connect(
lambda: self.panel_manager.toggle_panel("left", ensure_max=True)
)
# Top with scale=0.5 and no animation
self.btn_top.clicked.connect(
lambda: self.panel_manager.toggle_panel("top", scale=0.5, animation=False)
)
# Right with ensure_max, scale=0.3
self.btn_right.clicked.connect(
lambda: self.panel_manager.toggle_panel("right", ensure_max=True, scale=0.3)
)
# Bottom no animation
self.btn_bottom.clicked.connect(
lambda: self.panel_manager.toggle_panel("bottom", target_size=100, animation=False)
)
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
w = MainWindow()
w.show()
sys.exit(app.exec())

View File

@@ -2,10 +2,46 @@ import functools
import sys
import traceback
from qtpy.QtCore import QObject, Qt, Signal, Slot
from qtpy.QtCore import Property, QObject, Qt, Signal, Slot
from qtpy.QtWidgets import QApplication, QMessageBox, QPushButton, QVBoxLayout, QWidget
def SafeProperty(prop_type, *prop_args, popup_error: bool = False, **prop_kwargs):
"""
Decorator to create a Qt Property with a safe setter that won't crash Designer on errors.
Behaves similarly to SafeSlot, but for properties.
Args:
prop_type: The property type (e.g., str, bool, "QStringList", etc.)
popup_error (bool): If True, show popup on error, otherwise just handle it silently.
*prop_args, **prop_kwargs: Additional arguments and keyword arguments accepted by Property.
"""
def decorator(getter):
class PropertyWrapper:
def __init__(self, getter_func):
self.getter_func = getter_func
def setter(self, setter_func):
@functools.wraps(setter_func)
def safe_setter(self_, value):
try:
return setter_func(self_, value)
except Exception:
if popup_error:
ErrorPopupUtility().custom_exception_hook(
*sys.exc_info(), popup_error=True
)
else:
return
return Property(prop_type, self.getter_func, safe_setter, *prop_args, **prop_kwargs)
return PropertyWrapper(getter)
return decorator
def SafeSlot(*slot_args, **slot_kwargs): # pylint: disable=invalid-name
"""Function with args, acting like a decorator, applying "error_managed" decorator + Qt Slot
to the passed function, to display errors instead of potentially raising an exception
@@ -91,6 +127,12 @@ class _ErrorPopupUtility(QObject):
msg.setMinimumHeight(400)
msg.exec_()
def show_property_error(self, title, message, widget):
"""
Show a property-specific error message.
"""
self.error_occurred.emit(title, message, widget)
def format_traceback(self, traceback_message: str) -> str:
"""
Format the traceback message to be displayed in the error popup by adding indentation to each line.
@@ -127,12 +169,14 @@ class _ErrorPopupUtility(QObject):
error_message = " ".join(captured_message)
return error_message
def get_error_message(self, exctype, value, tb):
return "".join(traceback.format_exception(exctype, value, tb))
def custom_exception_hook(self, exctype, value, tb, popup_error=False):
if popup_error or self.enable_error_popup:
error_message = traceback.format_exception(exctype, value, tb)
self.error_occurred.emit(
"Method error" if popup_error else "Application Error",
"".join(error_message),
self.get_error_message(exctype, value, tb),
self.parent(),
)
else:

View File

@@ -12,7 +12,7 @@ from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import QObject, QRunnable, QThreadPool, Signal
from qtpy.QtWidgets import QApplication
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.qt_utils.error_popups import ErrorPopupUtility
from bec_widgets.qt_utils.error_popups import SafeSlot as pyqtSlot
from bec_widgets.utils.yaml_dialog import load_yaml, load_yaml_gui, save_yaml, save_yaml_gui

View File

@@ -93,17 +93,24 @@ def patch_designer(): # pragma: no cover
_extend_path_var("PATH", os.fspath(Path(sys._base_executable).parent), True)
else:
if sys.platform == "linux":
suffix = f"{sys.abiflags}.so"
env_var = "LD_PRELOAD"
current_pid = os.getpid()
with open(f"/proc/{current_pid}/maps", "rt") as f:
for line in f:
if "libpython" in line:
lib_path = line.split()[-1]
os.environ[env_var] = lib_path
break
elif sys.platform == "darwin":
suffix = ".dylib"
env_var = "DYLD_INSERT_LIBRARIES"
version = f"{major_version}.{minor_version}"
library_name = f"libpython{version}{suffix}"
lib_path = str(Path(sysconfig.get_config_var("LIBDIR")) / library_name)
os.environ[env_var] = lib_path
else:
raise RuntimeError(f"Unsupported platform: {sys.platform}")
version = f"{major_version}.{minor_version}"
library_name = f"libpython{version}{suffix}"
lib_path = str(Path(sysconfig.get_config_var("LIBDIR")) / library_name)
os.environ[env_var] = lib_path
if is_pyenv_python() or is_virtual_env():
# append all editable packages to the PYTHONPATH

View File

@@ -1,6 +1,5 @@
# pylint: disable=no-name-in-module
from abc import ABC, abstractmethod
from typing import Literal
from qtpy.QtWidgets import (
QApplication,
@@ -28,6 +27,15 @@ class WidgetHandler(ABC):
def set_value(self, widget: QWidget, value):
"""Set a value on the widget instance."""
def connect_change_signal(self, widget: QWidget, slot):
"""
Connect a change signal from this widget to the given slot.
If the widget type doesn't have a known "value changed" signal, do nothing.
slot: a function accepting two arguments (widget, value)
"""
pass
class LineEditHandler(WidgetHandler):
"""Handler for QLineEdit widgets."""
@@ -38,6 +46,9 @@ class LineEditHandler(WidgetHandler):
def set_value(self, widget: QLineEdit, value: str) -> None:
widget.setText(value)
def connect_change_signal(self, widget: QLineEdit, slot):
widget.textChanged.connect(lambda text, w=widget: slot(w, text))
class ComboBoxHandler(WidgetHandler):
"""Handler for QComboBox widgets."""
@@ -53,6 +64,11 @@ class ComboBoxHandler(WidgetHandler):
if isinstance(value, int):
widget.setCurrentIndex(value)
def connect_change_signal(self, widget: QComboBox, slot):
# currentIndexChanged(int) or currentIndexChanged(str) both possible.
# We use currentIndexChanged(int) for a consistent behavior.
widget.currentIndexChanged.connect(lambda idx, w=widget: slot(w, self.get_value(w)))
class TableWidgetHandler(WidgetHandler):
"""Handler for QTableWidget widgets."""
@@ -72,6 +88,16 @@ class TableWidgetHandler(WidgetHandler):
item = QTableWidgetItem(str(cell_value))
widget.setItem(row, col, item)
def connect_change_signal(self, widget: QTableWidget, slot):
# If desired, we could connect cellChanged(row, col) and then fetch all data.
# This might be noisy if table is large.
# For demonstration, connect cellChanged to update entire table value.
def on_cell_changed(row, col, w=widget):
val = self.get_value(w)
slot(w, val)
widget.cellChanged.connect(on_cell_changed)
class SpinBoxHandler(WidgetHandler):
"""Handler for QSpinBox and QDoubleSpinBox widgets."""
@@ -82,6 +108,9 @@ class SpinBoxHandler(WidgetHandler):
def set_value(self, widget, value):
widget.setValue(value)
def connect_change_signal(self, widget: QSpinBox | QDoubleSpinBox, slot):
widget.valueChanged.connect(lambda val, w=widget: slot(w, val))
class CheckBoxHandler(WidgetHandler):
"""Handler for QCheckBox widgets."""
@@ -92,6 +121,9 @@ class CheckBoxHandler(WidgetHandler):
def set_value(self, widget, value):
widget.setChecked(value)
def connect_change_signal(self, widget: QCheckBox, slot):
widget.toggled.connect(lambda val, w=widget: slot(w, val))
class LabelHandler(WidgetHandler):
"""Handler for QLabel widgets."""
@@ -99,12 +131,15 @@ class LabelHandler(WidgetHandler):
def get_value(self, widget, **kwargs):
return widget.text()
def set_value(self, widget, value):
def set_value(self, widget: QLabel, value):
widget.setText(value)
# QLabel typically doesn't have user-editable changes. No signal to connect.
# If needed, this can remain empty.
class WidgetIO:
"""Public interface for getting and setting values using handler mapping"""
"""Public interface for getting, setting values and connecting signals using handler mapping"""
_handlers = {
QLineEdit: LineEditHandler,
@@ -148,6 +183,17 @@ class WidgetIO:
elif not ignore_errors:
raise ValueError(f"No handler for widget type: {type(widget)}")
@staticmethod
def connect_widget_change_signal(widget, slot):
"""
Connect the widget's value-changed signal to a generic slot function (widget, value).
This now delegates the logic to the widget's handler.
"""
handler_class = WidgetIO._find_handler(widget)
if handler_class:
handler = handler_class()
handler.connect_change_signal(widget, slot)
@staticmethod
def check_and_adjust_limits(spin_box: QDoubleSpinBox, number: float):
"""
@@ -309,8 +355,8 @@ class WidgetHierarchy:
WidgetHierarchy.import_config_from_dict(child, widget_config, set_values)
# Example application to demonstrate the usage of the functions
if __name__ == "__main__": # pragma: no cover
# Example usage
def hierarchy_example(): # pragma: no cover
app = QApplication([])
# Create instance of WidgetHierarchy
@@ -365,3 +411,37 @@ if __name__ == "__main__": # pragma: no cover
print(f"Config dict new REDUCED: {config_dict_new_reduced}")
app.exec()
def widget_io_signal_example(): # pragma: no cover
app = QApplication([])
main_widget = QWidget()
layout = QVBoxLayout(main_widget)
line_edit = QLineEdit(main_widget)
combo_box = QComboBox(main_widget)
spin_box = QSpinBox(main_widget)
combo_box.addItems(["Option 1", "Option 2", "Option 3"])
layout.addWidget(line_edit)
layout.addWidget(combo_box)
layout.addWidget(spin_box)
main_widget.show()
def universal_slot(w, val):
print(f"Widget {w.objectName() or w} changed, new value: {val}")
# Connect all supported widgets through their handlers
WidgetIO.connect_widget_change_signal(line_edit, universal_slot)
WidgetIO.connect_widget_change_signal(combo_box, universal_slot)
WidgetIO.connect_widget_change_signal(spin_box, universal_slot)
app.exec_()
if __name__ == "__main__": # pragma: no cover
# Change example function to test different scenarios
# hierarchy_example()
widget_io_signal_example()

View File

@@ -6,7 +6,7 @@ from pydantic import Field
from pyqtgraph.dockarea import Dock, DockLabel
from qtpy import QtCore, QtGui
from bec_widgets.cli.rpc_wigdet_handler import widget_handler
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
from bec_widgets.utils import ConnectionConfig, GridLayoutManager
from bec_widgets.utils.bec_widget import BECWidget

View File

@@ -3,11 +3,12 @@ from __future__ import annotations
from typing import Literal, Optional
from weakref import WeakValueDictionary
from bec_lib.endpoints import MessageEndpoints
from pydantic import Field
from pyqtgraph.dockarea.DockArea import DockArea
from qtpy.QtCore import Qt
from qtpy.QtCore import QSize, Qt
from qtpy.QtGui import QPainter, QPaintEvent
from qtpy.QtWidgets import QSizePolicy, QVBoxLayout, QWidget
from qtpy.QtWidgets import QApplication, QSizePolicy, QVBoxLayout, QWidget
from bec_widgets.qt_utils.error_popups import SafeSlot
from bec_widgets.qt_utils.toolbar import (
@@ -43,6 +44,7 @@ class BECDockArea(BECWidget, QWidget):
PLUGIN = True
USER_ACCESS = [
"_config_dict",
"selected_device",
"panels",
"save_state",
"remove_dock",
@@ -55,6 +57,7 @@ class BECDockArea(BECWidget, QWidget):
"temp_areas",
"show",
"hide",
"delete",
]
def __init__(
@@ -158,6 +161,9 @@ class BECDockArea(BECWidget, QWidget):
self.toolbar.addWidget(DarkModeButton(toolbar=True))
self._hook_toolbar()
def minimumSizeHint(self):
return QSize(800, 600)
def _hook_toolbar(self):
# Menu Plot
self.toolbar.widgets["menu_plots"].widgets["waveform"].triggered.connect(
@@ -210,6 +216,17 @@ class BECDockArea(BECWidget, QWidget):
"Add docks using 'add_dock' method from CLI\n or \n Add widget docks using the toolbar",
)
@property
def selected_device(self) -> str:
gui_id = QApplication.instance().gui_id
auto_update_config = self.client.connector.get(
MessageEndpoints.gui_auto_update_config(gui_id)
)
try:
return auto_update_config.selected_device
except AttributeError:
return None
@property
def panels(self) -> dict[str, BECDock]:
"""
@@ -406,6 +423,17 @@ class BECDockArea(BECWidget, QWidget):
self.dock_area.deleteLater()
super().cleanup()
def closeEvent(self, event):
if self.parent() is None:
# we are at top-level (independent window)
if self.isVisible():
# we are visible => user clicked on [X]
# (when closeEvent is called from shutdown procedure,
# everything is hidden first)
# so, let's ignore "close", and do hide instead
event.ignore()
self.setVisible(False)
def close(self):
"""
Close the dock area and cleanup.
@@ -418,14 +446,24 @@ class BECDockArea(BECWidget, QWidget):
"""Show all windows including floating docks."""
super().show()
for docks in self.panels.values():
if docks.window() is self:
# avoid recursion
continue
docks.window().show()
def hide(self):
"""Hide all windows including floating docks."""
super().hide()
for docks in self.panels.values():
if docks.window() is self:
# avoid recursion
continue
docks.window().hide()
def delete(self):
self.hide()
self.deleteLater()
if __name__ == "__main__":
from qtpy.QtWidgets import QApplication

View File

@@ -634,7 +634,7 @@ class BECImageShow(BECPlotBase):
)
image_item.connected = False
if monitor and image_item.connected is False:
self.entry_validator.validate_monitor(monitor)
# self.entry_validator.validate_monitor(monitor)
if self.image_type == "device_monitor_1d":
self.bec_dispatcher.connect_slot(
self.on_image_update, MessageEndpoints.device_monitor_1d(monitor)

View File

@@ -0,0 +1,881 @@
import math
import sys
from typing import Dict, Literal, Optional, Set, Tuple, Union
from qtpy.QtWidgets import (
QApplication,
QComboBox,
QGridLayout,
QGroupBox,
QHBoxLayout,
QLabel,
QLineEdit,
QMainWindow,
QMessageBox,
QPushButton,
QSpinBox,
QSplitter,
QVBoxLayout,
QWidget,
)
from typeguard import typechecked
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
class LayoutManagerWidget(QWidget):
"""
A robust layout manager that extends QGridLayout functionality, allowing
users to add/remove widgets, access widgets by coordinates, shift widgets,
and change the layout dynamically with automatic reindexing to keep the grid compact.
Supports adding widgets via QWidget instances or string identifiers referencing the widget handler.
"""
def __init__(self, parent=None, auto_reindex=True):
super().__init__(parent)
self.layout = QGridLayout(self)
self.auto_reindex = auto_reindex
# Mapping from widget to its position (row, col, rowspan, colspan)
self.widget_positions: Dict[QWidget, Tuple[int, int, int, int]] = {}
# Mapping from (row, col) to widget
self.position_widgets: Dict[Tuple[int, int], QWidget] = {}
# Keep track of the current position for automatic placement
self.current_row = 0
self.current_col = 0
def add_widget(
self,
widget: QWidget | str,
row: int | None = None,
col: Optional[int] = None,
rowspan: int = 1,
colspan: int = 1,
shift_existing: bool = True,
shift_direction: Literal["down", "up", "left", "right"] = "right",
) -> QWidget:
"""
Add a widget to the grid with enhanced shifting capabilities.
Args:
widget (QWidget | str): The widget to add. If str, it is used to create a widget via widget_handler.
row (int, optional): The row to add the widget to. If None, the next available row is used.
col (int, optional): The column to add the widget to. If None, the next available column is used.
rowspan (int): Number of rows the widget spans. Default is 1.
colspan (int): Number of columns the widget spans. Default is 1.
shift_existing (bool): Whether to shift existing widgets if the target position is occupied. Default is True.
shift_direction (Literal["down", "up", "left", "right"]): Direction to shift existing widgets. Default is "right".
Returns:
QWidget: The widget that was added.
"""
# Handle widget creation if a BECWidget string identifier is provided
if isinstance(widget, str):
widget = widget_handler.create_widget(widget)
if row is None:
row = self.current_row
if col is None:
col = self.current_col
if (row, col) in self.position_widgets:
if shift_existing:
# Attempt to shift the existing widget in the specified direction
self.shift_widgets(direction=shift_direction, start_row=row, start_col=col)
else:
raise ValueError(f"Position ({row}, {col}) is already occupied.")
# Add the widget to the layout
self.layout.addWidget(widget, row, col, rowspan, colspan)
self.widget_positions[widget] = (row, col, rowspan, colspan)
self.position_widgets[(row, col)] = widget
# Update current position for automatic placement
self.current_col = col + colspan
self.current_row = max(self.current_row, row)
if self.auto_reindex:
self.reindex_grid()
return widget
def add_widget_relative(
self,
widget: QWidget | str,
reference_widget: QWidget,
position: Literal["left", "right", "top", "bottom"],
rowspan: int = 1,
colspan: int = 1,
shift_existing: bool = True,
shift_direction: Literal["down", "up", "left", "right"] = "right",
) -> QWidget:
"""
Add a widget relative to an existing widget.
Args:
widget (QWidget | str): The widget to add. If str, it is used to create a widget via widget_handler.
reference_widget (QWidget): The widget relative to which the new widget will be placed.
position (Literal["left", "right", "top", "bottom"]): Position relative to the reference widget.
rowspan (int): Number of rows the widget spans. Default is 1.
colspan (int): Number of columns the widget spans. Default is 1.
shift_existing (bool): Whether to shift existing widgets if the target position is occupied.
shift_direction (Literal["down", "up", "left", "right"]): Direction to shift existing widgets.
Returns:
QWidget: The widget that was added.
Raises:
ValueError: If the reference widget is not found.
"""
if reference_widget not in self.widget_positions:
raise ValueError("Reference widget not found in layout.")
ref_row, ref_col, ref_rowspan, ref_colspan = self.widget_positions[reference_widget]
# Determine new widget position based on the specified relative position
if position == "left":
new_row = ref_row
new_col = ref_col - 1
elif position == "right":
new_row = ref_row
new_col = ref_col + ref_colspan
elif position == "top":
new_row = ref_row - 1
new_col = ref_col
elif position == "bottom":
new_row = ref_row + ref_rowspan
new_col = ref_col
else:
raise ValueError("Invalid position. Choose from 'left', 'right', 'top', 'bottom'.")
# Add the widget at the calculated position
return self.add_widget(
widget=widget,
row=new_row,
col=new_col,
rowspan=rowspan,
colspan=colspan,
shift_existing=shift_existing,
shift_direction=shift_direction,
)
def move_widget_by_coords(
self,
current_row: int,
current_col: int,
new_row: int,
new_col: int,
shift: bool = True,
shift_direction: Literal["down", "up", "left", "right"] = "right",
) -> None:
"""
Move a widget from (current_row, current_col) to (new_row, new_col).
Args:
current_row (int): Current row of the widget.
current_col (int): Current column of the widget.
new_row (int): Target row.
new_col (int): Target column.
shift (bool): Whether to shift existing widgets if the target position is occupied.
shift_direction (Literal["down", "up", "left", "right"]): Direction to shift existing widgets.
Raises:
ValueError: If the widget is not found or target position is invalid.
"""
self.move_widget(
old_row=current_row,
old_col=current_col,
new_row=new_row,
new_col=new_col,
shift=shift,
shift_direction=shift_direction,
)
@typechecked
def move_widget_by_object(
self,
widget: QWidget,
new_row: int,
new_col: int,
shift: bool = True,
shift_direction: Literal["down", "up", "left", "right"] = "right",
) -> None:
"""
Move a widget to a new position using the widget object.
Args:
widget (QWidget): The widget to move.
new_row (int): Target row.
new_col (int): Target column.
shift (bool): Whether to shift existing widgets if the target position is occupied.
shift_direction (Literal["down", "up", "left", "right"]): Direction to shift existing widgets.
Raises:
ValueError: If the widget is not found or target position is invalid.
"""
if widget not in self.widget_positions:
raise ValueError("Widget not found in layout.")
old_position = self.widget_positions[widget]
old_row, old_col = old_position[0], old_position[1]
self.move_widget(
old_row=old_row,
old_col=old_col,
new_row=new_row,
new_col=new_col,
shift=shift,
shift_direction=shift_direction,
)
@typechecked
def move_widget(
self,
old_row: int | None = None,
old_col: int | None = None,
new_row: int | None = None,
new_col: int | None = None,
shift: bool = True,
shift_direction: Literal["down", "up", "left", "right"] = "right",
) -> None:
"""
Move a widget to a new position. If the new position is occupied and shift is True,
shift the existing widget to the specified direction.
Args:
old_row (int, optional): The current row of the widget.
old_col (int, optional): The current column of the widget.
new_row (int, optional): The target row to move the widget to.
new_col (int, optional): The target column to move the widget to.
shift (bool): Whether to shift existing widgets if the target position is occupied.
shift_direction (Literal["down", "up", "left", "right"]): Direction to shift existing widgets.
Raises:
ValueError: If the widget is not found or target position is invalid.
"""
if new_row is None or new_col is None:
raise ValueError("Must provide both new_row and new_col to move a widget.")
if old_row is None and old_col is None:
raise ValueError(f"No widget found at position ({old_row}, {old_col}).")
widget = self.get_widget(old_row, old_col)
if (new_row, new_col) in self.position_widgets:
if not shift:
raise ValueError(f"Position ({new_row}, {new_col}) is already occupied.")
# Shift the existing widget to make space
self.shift_widgets(
direction=shift_direction,
start_row=new_row if shift_direction in ["down", "up"] else 0,
start_col=new_col if shift_direction in ["left", "right"] else 0,
)
# Proceed to move the widget
self.layout.removeWidget(widget)
old_position = self.widget_positions.pop(widget)
self.position_widgets.pop((old_position[0], old_position[1]))
self.layout.addWidget(widget, new_row, new_col, old_position[2], old_position[3])
self.widget_positions[widget] = (new_row, new_col, old_position[2], old_position[3])
self.position_widgets[(new_row, new_col)] = widget
# Update current_row and current_col for automatic placement if needed
self.current_row = max(self.current_row, new_row)
self.current_col = max(self.current_col, new_col + old_position[3])
if self.auto_reindex:
self.reindex_grid()
@typechecked
def shift_widgets(
self,
direction: Literal["down", "up", "left", "right"],
start_row: int = 0,
start_col: int = 0,
) -> None:
"""
Shift widgets in the grid in the specified direction starting from the given position.
Args:
direction (Literal["down", "up", "left", "right"]): Direction to shift widgets.
start_row (int): Starting row index.
start_col (int): Starting column index.
Raises:
ValueError: If shifting causes widgets to go out of grid boundaries.
"""
shifts = []
positions_to_shift = [(start_row, start_col)]
visited_positions = set()
while positions_to_shift:
row, col = positions_to_shift.pop(0)
if (row, col) in visited_positions:
continue
visited_positions.add((row, col))
widget = self.position_widgets.get((row, col))
if widget is None:
continue # No widget at this position
# Compute new position based on the direction
if direction == "down":
new_row = row + 1
new_col = col
elif direction == "up":
new_row = row - 1
new_col = col
elif direction == "right":
new_row = row
new_col = col + 1
elif direction == "left":
new_row = row
new_col = col - 1
# Check for negative indices
if new_row < 0 or new_col < 0:
raise ValueError("Shifting widgets out of grid boundaries.")
# If the new position is occupied, add it to the positions to shift
if (new_row, new_col) in self.position_widgets:
positions_to_shift.append((new_row, new_col))
shifts.append(
(widget, (row, col), (new_row, new_col), self.widget_positions[widget][2:])
)
# Remove all widgets from their old positions
for widget, (old_row, old_col), _, _ in shifts:
self.layout.removeWidget(widget)
self.position_widgets.pop((old_row, old_col))
# Add widgets to their new positions
for widget, _, (new_row, new_col), (rowspan, colspan) in shifts:
self.layout.addWidget(widget, new_row, new_col, rowspan, colspan)
self.widget_positions[widget] = (new_row, new_col, rowspan, colspan)
self.position_widgets[(new_row, new_col)] = widget
# Update current_row and current_col if needed
self.current_row = max(self.current_row, new_row)
self.current_col = max(self.current_col, new_col + colspan)
def shift_all_widgets(self, direction: Literal["down", "up", "left", "right"]) -> None:
"""
Shift all widgets in the grid in the specified direction to make room and prevent negative indices.
Args:
direction (Literal["down", "up", "left", "right"]): Direction to shift all widgets.
"""
# First, collect all the shifts to perform
shifts = []
for widget, (row, col, rowspan, colspan) in self.widget_positions.items():
if direction == "down":
new_row = row + 1
new_col = col
elif direction == "up":
new_row = row - 1
new_col = col
elif direction == "right":
new_row = row
new_col = col + 1
elif direction == "left":
new_row = row
new_col = col - 1
# Check for negative indices
if new_row < 0 or new_col < 0:
raise ValueError("Shifting widgets out of grid boundaries.")
shifts.append((widget, (row, col), (new_row, new_col), (rowspan, colspan)))
# Now perform the shifts
for widget, (old_row, old_col), (new_row, new_col), (rowspan, colspan) in shifts:
self.layout.removeWidget(widget)
self.position_widgets.pop((old_row, old_col))
for widget, (old_row, old_col), (new_row, new_col), (rowspan, colspan) in shifts:
self.layout.addWidget(widget, new_row, new_col, rowspan, colspan)
self.widget_positions[widget] = (new_row, new_col, rowspan, colspan)
self.position_widgets[(new_row, new_col)] = widget
# Update current_row and current_col based on new widget positions
self.current_row = max((pos[0] for pos in self.position_widgets.keys()), default=0)
self.current_col = max((pos[1] for pos in self.position_widgets.keys()), default=0)
def remove(
self,
row: int | None = None,
col: int | None = None,
coordinates: Tuple[int, int] | None = None,
) -> None:
"""
Remove a widget from the layout. Can be removed by widget ID or by coordinates.
Args:
row (int, optional): The row coordinate of the widget to remove.
col (int, optional): The column coordinate of the widget to remove.
coordinates (tuple[int, int], optional): The (row, col) coordinates of the widget to remove.
Raises:
ValueError: If the widget to remove is not found.
"""
if coordinates:
row, col = coordinates
widget = self.get_widget(row, col)
if widget is None:
raise ValueError(f"No widget found at coordinates {coordinates}.")
elif row is not None and col is not None:
widget = self.get_widget(row, col)
if widget is None:
raise ValueError(f"No widget found at position ({row}, {col}).")
else:
raise ValueError(
"Must provide either widget_id, coordinates, or both row and col for removal."
)
self.remove_widget(widget)
def remove_widget(self, widget: QWidget) -> None:
"""
Remove a widget from the grid and reindex the grid to keep it compact.
Args:
widget (QWidget): The widget to remove.
Raises:
ValueError: If the widget is not found in the layout.
"""
if widget not in self.widget_positions:
raise ValueError("Widget not found in layout.")
position = self.widget_positions.pop(widget)
self.position_widgets.pop((position[0], position[1]))
self.layout.removeWidget(widget)
widget.setParent(None) # Remove widget from the parent
widget.deleteLater()
# Reindex the grid to maintain compactness
if self.auto_reindex:
self.reindex_grid()
def get_widget(self, row: int, col: int) -> QWidget | None:
"""
Get the widget at the specified position.
Args:
row (int): The row coordinate.
col (int): The column coordinate.
Returns:
QWidget | None: The widget at the specified position, or None if empty.
"""
return self.position_widgets.get((row, col))
def get_widget_position(self, widget: QWidget) -> Tuple[int, int, int, int] | None:
"""
Get the position of the specified widget.
Args:
widget (QWidget): The widget to query.
Returns:
Tuple[int, int, int, int] | None: The (row, col, rowspan, colspan) tuple, or None if not found.
"""
return self.widget_positions.get(widget)
def change_layout(self, num_rows: int | None = None, num_cols: int | None = None) -> None:
"""
Change the layout to have a certain number of rows and/or columns,
rearranging the widgets accordingly.
If only one of num_rows or num_cols is provided, the other is calculated automatically
based on the number of widgets and the provided constraint.
If both are provided, num_rows is calculated based on num_cols.
Args:
num_rows (int | None): The new maximum number of rows.
num_cols (int | None): The new maximum number of columns.
"""
if num_rows is None and num_cols is None:
return # Nothing to change
total_widgets = len(self.widget_positions)
if num_cols is not None:
# Calculate num_rows based on num_cols
num_rows = math.ceil(total_widgets / num_cols)
elif num_rows is not None:
# Calculate num_cols based on num_rows
num_cols = math.ceil(total_widgets / num_rows)
# Sort widgets by current position (row-major order)
widgets_sorted = sorted(
self.widget_positions.items(),
key=lambda item: (item[1][0], item[1][1]), # Sort by row, then column
)
# Clear the layout without deleting widgets
for widget, _ in widgets_sorted:
self.layout.removeWidget(widget)
# Reset position mappings
self.widget_positions.clear()
self.position_widgets.clear()
# Re-add widgets based on new layout constraints
current_row, current_col = 0, 0
for widget, _ in widgets_sorted:
if current_col >= num_cols:
current_col = 0
current_row += 1
self.layout.addWidget(widget, current_row, current_col, 1, 1)
self.widget_positions[widget] = (current_row, current_col, 1, 1)
self.position_widgets[(current_row, current_col)] = widget
current_col += 1
# Update current_row and current_col for automatic placement
self.current_row = current_row
self.current_col = current_col
# Reindex the grid to ensure compactness
self.reindex_grid()
def clear_layout(self) -> None:
"""
Remove all widgets from the layout without deleting them.
"""
for widget in list(self.widget_positions):
self.layout.removeWidget(widget)
self.position_widgets.pop(
(self.widget_positions[widget][0], self.widget_positions[widget][1])
)
self.widget_positions.pop(widget)
widget.setParent(None) # Optionally hide/remove the widget
self.current_row = 0
self.current_col = 0
def reindex_grid(self) -> None:
"""
Reindex the grid to remove empty rows and columns, ensuring that
widget coordinates are contiguous and start from (0, 0).
"""
# Step 1: Collect all occupied positions
occupied_positions = sorted(self.position_widgets.keys())
if not occupied_positions:
# No widgets to reindex
self.clear_layout()
return
# Step 2: Determine the new mapping by eliminating empty columns and rows
# Find unique rows and columns
unique_rows = sorted(set(pos[0] for pos in occupied_positions))
unique_cols = sorted(set(pos[1] for pos in occupied_positions))
# Create mappings from old to new indices
row_mapping = {old_row: new_row for new_row, old_row in enumerate(unique_rows)}
col_mapping = {old_col: new_col for new_col, old_col in enumerate(unique_cols)}
# Step 3: Collect widgets with their new positions
widgets_with_new_positions = []
for widget, (row, col, rowspan, colspan) in self.widget_positions.items():
new_row = row_mapping[row]
new_col = col_mapping[col]
widgets_with_new_positions.append((widget, new_row, new_col, rowspan, colspan))
# Step 4: Clear the layout and reset mappings
self.clear_layout()
# Reset current_row and current_col
self.current_row = 0
self.current_col = 0
# Step 5: Re-add widgets with new positions
for widget, new_row, new_col, rowspan, colspan in widgets_with_new_positions:
self.layout.addWidget(widget, new_row, new_col, rowspan, colspan)
self.widget_positions[widget] = (new_row, new_col, rowspan, colspan)
self.position_widgets[(new_row, new_col)] = widget
# Update current position for automatic placement
self.current_col = max(self.current_col, new_col + colspan)
self.current_row = max(self.current_row, new_row)
def get_widgets_positions(self) -> Dict[QWidget, Tuple[int, int, int, int]]:
"""
Get the positions of all widgets in the layout.
Returns:
Dict[QWidget, Tuple[int, int, int, int]]: Mapping of widgets to their (row, col, rowspan, colspan).
"""
return self.widget_positions.copy()
def print_all_button_text(self):
"""Debug function to print the text of all QPushButton widgets."""
print("Coordinates - Button Text")
for coord, widget in self.position_widgets.items():
if isinstance(widget, QPushButton):
print(f"{coord} - {widget.text()}")
####################################################################################################
# The following code is for the GUI control panel to interact with the LayoutManagerWidget.
# It is not covered by any tests as it serves only as an example for the LayoutManagerWidget class.
####################################################################################################
class ControlPanel(QWidget): # pragma: no cover
def __init__(self, layout_manager: LayoutManagerWidget):
super().__init__()
self.layout_manager = layout_manager
self.init_ui()
def init_ui(self):
main_layout = QVBoxLayout()
# Add Widget by Coordinates
add_coord_group = QGroupBox("Add Widget by Coordinates")
add_coord_layout = QGridLayout()
add_coord_layout.addWidget(QLabel("Text:"), 0, 0)
self.text_input = QLineEdit()
add_coord_layout.addWidget(self.text_input, 0, 1)
add_coord_layout.addWidget(QLabel("Row:"), 1, 0)
self.row_input = QSpinBox()
self.row_input.setMinimum(0)
add_coord_layout.addWidget(self.row_input, 1, 1)
add_coord_layout.addWidget(QLabel("Column:"), 2, 0)
self.col_input = QSpinBox()
self.col_input.setMinimum(0)
add_coord_layout.addWidget(self.col_input, 2, 1)
self.add_button = QPushButton("Add at Coordinates")
self.add_button.clicked.connect(self.add_at_coordinates)
add_coord_layout.addWidget(self.add_button, 3, 0, 1, 2)
add_coord_group.setLayout(add_coord_layout)
main_layout.addWidget(add_coord_group)
# Add Widget Relative
add_rel_group = QGroupBox("Add Widget Relative to Existing")
add_rel_layout = QGridLayout()
add_rel_layout.addWidget(QLabel("Text:"), 0, 0)
self.rel_text_input = QLineEdit()
add_rel_layout.addWidget(self.rel_text_input, 0, 1)
add_rel_layout.addWidget(QLabel("Reference Widget:"), 1, 0)
self.ref_widget_combo = QComboBox()
add_rel_layout.addWidget(self.ref_widget_combo, 1, 1)
add_rel_layout.addWidget(QLabel("Position:"), 2, 0)
self.position_combo = QComboBox()
self.position_combo.addItems(["left", "right", "top", "bottom"])
add_rel_layout.addWidget(self.position_combo, 2, 1)
self.add_rel_button = QPushButton("Add Relative")
self.add_rel_button.clicked.connect(self.add_relative)
add_rel_layout.addWidget(self.add_rel_button, 3, 0, 1, 2)
add_rel_group.setLayout(add_rel_layout)
main_layout.addWidget(add_rel_group)
# Remove Widget
remove_group = QGroupBox("Remove Widget")
remove_layout = QGridLayout()
remove_layout.addWidget(QLabel("Row:"), 0, 0)
self.remove_row_input = QSpinBox()
self.remove_row_input.setMinimum(0)
remove_layout.addWidget(self.remove_row_input, 0, 1)
remove_layout.addWidget(QLabel("Column:"), 1, 0)
self.remove_col_input = QSpinBox()
self.remove_col_input.setMinimum(0)
remove_layout.addWidget(self.remove_col_input, 1, 1)
self.remove_button = QPushButton("Remove at Coordinates")
self.remove_button.clicked.connect(self.remove_widget)
remove_layout.addWidget(self.remove_button, 2, 0, 1, 2)
remove_group.setLayout(remove_layout)
main_layout.addWidget(remove_group)
# Change Layout
change_layout_group = QGroupBox("Change Layout")
change_layout_layout = QGridLayout()
change_layout_layout.addWidget(QLabel("Number of Rows:"), 0, 0)
self.change_rows_input = QSpinBox()
self.change_rows_input.setMinimum(1)
self.change_rows_input.setValue(1) # Default value
change_layout_layout.addWidget(self.change_rows_input, 0, 1)
change_layout_layout.addWidget(QLabel("Number of Columns:"), 1, 0)
self.change_cols_input = QSpinBox()
self.change_cols_input.setMinimum(1)
self.change_cols_input.setValue(1) # Default value
change_layout_layout.addWidget(self.change_cols_input, 1, 1)
self.change_layout_button = QPushButton("Apply Layout Change")
self.change_layout_button.clicked.connect(self.change_layout)
change_layout_layout.addWidget(self.change_layout_button, 2, 0, 1, 2)
change_layout_group.setLayout(change_layout_layout)
main_layout.addWidget(change_layout_group)
# Remove All Widgets
self.clear_all_button = QPushButton("Clear All Widgets")
self.clear_all_button.clicked.connect(self.clear_all_widgets)
main_layout.addWidget(self.clear_all_button)
# Refresh Reference Widgets and Print Button
self.refresh_button = QPushButton("Refresh Reference Widgets")
self.refresh_button.clicked.connect(self.refresh_references)
self.print_button = QPushButton("Print All Button Text")
self.print_button.clicked.connect(self.layout_manager.print_all_button_text)
main_layout.addWidget(self.refresh_button)
main_layout.addWidget(self.print_button)
main_layout.addStretch()
self.setLayout(main_layout)
self.refresh_references()
def refresh_references(self):
self.ref_widget_combo.clear()
widgets = self.layout_manager.get_widgets_positions()
for widget in widgets:
if isinstance(widget, QPushButton):
self.ref_widget_combo.addItem(widget.text(), widget)
def add_at_coordinates(self):
text = self.text_input.text()
row = self.row_input.value()
col = self.col_input.value()
if not text:
QMessageBox.warning(self, "Input Error", "Please enter text for the button.")
return
button = QPushButton(text)
try:
self.layout_manager.add_widget(widget=button, row=row, col=col)
self.refresh_references()
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
def add_relative(self):
text = self.rel_text_input.text()
ref_index = self.ref_widget_combo.currentIndex()
ref_widget = self.ref_widget_combo.itemData(ref_index)
position = self.position_combo.currentText()
if not text:
QMessageBox.warning(self, "Input Error", "Please enter text for the button.")
return
if ref_widget is None:
QMessageBox.warning(self, "Input Error", "Please select a reference widget.")
return
button = QPushButton(text)
try:
self.layout_manager.add_widget_relative(
widget=button, reference_widget=ref_widget, position=position
)
self.refresh_references()
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
def remove_widget(self):
row = self.remove_row_input.value()
col = self.remove_col_input.value()
try:
widget = self.layout_manager.get_widget(row, col)
if widget is None:
QMessageBox.warning(self, "Not Found", f"No widget found at ({row}, {col}).")
return
self.layout_manager.remove_widget(widget)
self.refresh_references()
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
def change_layout(self):
num_rows = self.change_rows_input.value()
num_cols = self.change_cols_input.value()
try:
self.layout_manager.change_layout(num_rows=num_rows, num_cols=num_cols)
self.refresh_references()
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
def clear_all_widgets(self):
reply = QMessageBox.question(
self,
"Confirm Clear",
"Are you sure you want to remove all widgets?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No,
)
if reply == QMessageBox.Yes:
try:
self.layout_manager.clear_layout()
self.refresh_references()
except Exception as e:
QMessageBox.critical(self, "Error", str(e))
class MainWindow(QMainWindow): # pragma: no cover
def __init__(self):
super().__init__()
self.setWindowTitle("Layout Manager Demo")
self.resize(800, 600)
self.init_ui()
def init_ui(self):
central_widget = QWidget()
main_layout = QHBoxLayout()
# Layout Area GroupBox
layout_group = QGroupBox("Layout Area")
layout_group.setMinimumSize(400, 400)
layout_layout = QVBoxLayout()
self.layout_manager = LayoutManagerWidget()
layout_layout.addWidget(self.layout_manager)
layout_group.setLayout(layout_layout)
# Splitter
splitter = QSplitter()
splitter.addWidget(layout_group)
# Control Panel
control_panel = ControlPanel(self.layout_manager)
control_group = QGroupBox("Control Panel")
control_layout = QVBoxLayout()
control_layout.addWidget(control_panel)
control_layout.addStretch()
control_group.setLayout(control_layout)
splitter.addWidget(control_group)
main_layout.addWidget(splitter)
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
if __name__ == "__main__": # pragma: no cover
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec_())

View File

@@ -1,9 +1,41 @@
from qtpy.QtWidgets import QMainWindow
from qtpy.QtWidgets import QApplication, QMainWindow
from bec_widgets.utils import BECConnector
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
class BECMainWindow(QMainWindow, BECConnector):
def __init__(self, *args, **kwargs):
BECConnector.__init__(self, **kwargs)
QMainWindow.__init__(self, *args, **kwargs)
def _dump(self):
"""Return a dictionary with informations about the application state, for use in tests"""
# TODO: ModularToolBar and something else leak top-level widgets (3 or 4 QMenu + 2 QWidget);
# so, a filtering based on title is applied here, but the solution is to not have those widgets
# as top-level (so for now, a window with no title does not appear in _dump() result)
# NOTE: the main window itself is excluded, since we want to dump dock areas
info = {
tlw.gui_id: {
"title": tlw.windowTitle(),
"visible": tlw.isVisible(),
"class": str(type(tlw)),
}
for tlw in QApplication.instance().topLevelWidgets()
if tlw is not self and tlw.windowTitle()
}
# Add the main window dock area
info[self.centralWidget().gui_id] = {
"title": self.windowTitle(),
"visible": self.isVisible(),
"class": str(type(self.centralWidget())),
}
return info
def new_dock_area(self, name):
dock_area = BECDockArea()
dock_area.resize(dock_area.minimumSizeHint())
dock_area.window().setWindowTitle(name)
dock_area.show()
return dock_area

View File

@@ -190,7 +190,7 @@ class BECImageWidget(BECWidget, QWidget):
###################################
# User Access Methods from image
###################################
@SafeSlot(popup_error=True)
@SafeSlot(popup_error=False)
def image(
self,
monitor: str,

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "bec_widgets"
version = "1.9.0"
version = "1.12.0"
description = "BEC Widgets"
requires-python = ">=3.10"
classifiers = [
@@ -30,7 +30,7 @@ dependencies = [
dev = [
"coverage~=7.0",
"fakeredis~=2.23, >=2.23.2",
"pytest-bec-e2e~=2.16",
"pytest-bec-e2e>=2.21.4, <=4.0",
"pytest-qt~=4.4",
"pytest-random-order~=1.1",
"pytest-timeout~=2.2",

View File

@@ -5,8 +5,7 @@ from contextlib import contextmanager
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECDockArea
from bec_widgets.cli.client_utils import _start_plot_process
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
from bec_widgets.utils import BECDispatcher
from bec_widgets.widgets.containers.figure import BECFigure
@@ -41,27 +40,37 @@ def plot_server(gui_id, klass, client_lib):
@pytest.fixture
def rpc_server_figure(gui_id, bec_client_lib):
def connected_client_figure(gui_id, bec_client_lib):
with plot_server(gui_id, BECFigure, bec_client_lib) as server:
yield server
@pytest.fixture
def rpc_server_dock(gui_id, bec_client_lib):
dock_area = BECDockArea(gui_id=gui_id)
dock_area._auto_updates_enabled = False
def connected_client_gui_obj(gui_id, bec_client_lib):
gui = BECGuiClient(gui_id=gui_id)
try:
dock_area.start_server(wait=True)
yield dock_area
gui.start_server(wait=True)
yield gui
finally:
dock_area.close()
gui.close()
@pytest.fixture
def rpc_server_dock_w_auto_updates(gui_id, bec_client_lib):
dock_area = BECDockArea(gui_id=gui_id)
def connected_client_dock(gui_id, bec_client_lib):
gui = BECGuiClient(gui_id=gui_id)
gui._auto_updates_enabled = False
try:
dock_area.start_server(wait=True)
yield dock_area
gui.start_server(wait=True)
yield gui.main
finally:
dock_area.close()
gui.close()
@pytest.fixture
def connected_client_dock_w_auto_updates(gui_id, bec_client_lib):
gui = BECGuiClient(gui_id=gui_id)
try:
gui.start_server(wait=True)
yield gui, gui.main
finally:
gui.close()

View File

@@ -1,9 +1,9 @@
import time
import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.auto_updates import AutoUpdates
from bec_widgets.cli.client import BECDockArea, BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.utils import Colors
@@ -12,9 +12,9 @@ from bec_widgets.utils import Colors
# pylint: disable=too-many-locals
def test_rpc_add_dock_with_figure_e2e(bec_client_lib, rpc_server_dock):
def test_rpc_add_dock_with_figure_e2e(bec_client_lib, connected_client_dock):
# BEC client shortcuts
dock = rpc_server_dock
dock = connected_client_dock
client = bec_client_lib
dev = client.device_manager.devices
scans = client.scans
@@ -123,8 +123,8 @@ def test_rpc_add_dock_with_figure_e2e(bec_client_lib, rpc_server_dock):
)
def test_dock_manipulations_e2e(rpc_server_dock):
dock = rpc_server_dock
def test_dock_manipulations_e2e(connected_client_dock):
dock = connected_client_dock
d0 = dock.add_dock("dock_0")
d1 = dock.add_dock("dock_1")
@@ -155,8 +155,8 @@ def test_dock_manipulations_e2e(rpc_server_dock):
assert len(dock.temp_areas) == 0
def test_ring_bar(rpc_server_dock):
dock = rpc_server_dock
def test_ring_bar(connected_client_dock):
dock = connected_client_dock
d0 = dock.add_dock(name="dock_0")
@@ -182,8 +182,8 @@ def test_ring_bar(rpc_server_dock):
assert bar_colors == expected_colors_light or bar_colors == expected_colors_dark
def test_ring_bar_scan_update(bec_client_lib, rpc_server_dock):
dock = rpc_server_dock
def test_ring_bar_scan_update(bec_client_lib, connected_client_dock):
dock = connected_client_dock
d0 = dock.add_dock("dock_0")
@@ -234,19 +234,20 @@ def test_ring_bar_scan_update(bec_client_lib, rpc_server_dock):
assert bar_config["rings"][1]["max_value"] == final_samy
def test_auto_update(bec_client_lib, rpc_server_dock_w_auto_updates, qtbot):
def test_auto_update(bec_client_lib, connected_client_dock_w_auto_updates, qtbot):
client = bec_client_lib
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
dock = rpc_server_dock_w_auto_updates
gui, dock = connected_client_dock_w_auto_updates
auto_updates = gui.auto_updates
def get_default_figure():
return dock.auto_updates.get_default_figure()
return auto_updates.get_default_figure()
plt = get_default_figure()
dock.selected_device = "bpm4i"
gui.selected_device = "bpm4i"
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
@@ -274,7 +275,7 @@ def test_auto_update(bec_client_lib, rpc_server_dock_w_auto_updates, qtbot):
)
status.wait()
plt = dock.auto_updates.get_default_figure()
plt = auto_updates.get_default_figure()
widgets = plt.widget_list
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
plt_data = widgets[0].get_all_data()
@@ -291,3 +292,69 @@ def test_auto_update(bec_client_lib, rpc_server_dock_w_auto_updates, qtbot):
plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["y"]
== last_scan_data["samy"]["samy"].val
)
def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
gui = connected_client_gui_obj
assert gui.selected_device is None
assert len(gui.windows) == 1
assert gui.windows["main"].widget is gui.main
assert gui.windows["main"].title == "BEC Widgets"
mw = gui.main
assert mw.__class__.__name__ == "BECDockArea"
xw = gui.new("X")
assert xw.__class__.__name__ == "BECDockArea"
assert len(gui.windows) == 2
gui_info = gui._dump()
mw_info = gui_info[mw._gui_id]
assert mw_info["title"] == "BEC Widgets"
assert mw_info["visible"]
xw_info = gui_info[xw._gui_id]
assert xw_info["title"] == "X"
assert xw_info["visible"]
gui.hide()
gui_info = gui._dump()
assert not any(windows["visible"] for windows in gui_info.values())
gui.show()
gui_info = gui._dump()
assert all(windows["visible"] for windows in gui_info.values())
assert gui.gui_is_alive()
gui.close()
assert not gui.gui_is_alive()
gui.start_server(wait=True)
assert gui.gui_is_alive()
# calling start multiple times should not change anything
gui.start_server(wait=True)
gui.start()
# gui.windows should have main, and main dock area should have same gui_id as before
assert len(gui.windows) == 1
assert gui.windows["main"].widget._gui_id == mw._gui_id
# communication should work, main dock area should have same id and be visible
gui_info = gui._dump()
assert gui_info[mw._gui_id]["visible"]
with pytest.raises(RuntimeError):
gui.main.delete()
yw = gui.new("Y")
assert len(gui.windows) == 2
yw.delete()
assert len(gui.windows) == 1
# check it is really deleted on server
gui_info = gui._dump()
assert yw._gui_id not in gui_info
def test_rpc_call_with_exception_in_safeslot_error_popup(connected_client_gui_obj, qtbot):
gui = connected_client_gui_obj
gui.main.add_dock("test")
with pytest.raises(ValueError):
gui.main.add_dock("test")
# time.sleep(0.1)

View File

@@ -7,8 +7,8 @@ from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
def test_rpc_waveform1d_custom_curve(rpc_server_figure):
fig = BECFigure(rpc_server_figure)
def test_rpc_waveform1d_custom_curve(connected_client_figure):
fig = BECFigure(connected_client_figure)
ax = fig.plot()
curve = ax.plot(x=[1, 2, 3], y=[1, 2, 3])
@@ -20,8 +20,8 @@ def test_rpc_waveform1d_custom_curve(rpc_server_figure):
assert len(fig.widgets[ax._rpc_id].curves) == 1
def test_rpc_plotting_shortcuts_init_configs(rpc_server_figure, qtbot):
fig = BECFigure(rpc_server_figure)
def test_rpc_plotting_shortcuts_init_configs(connected_client_figure, qtbot):
fig = BECFigure(connected_client_figure)
plt = fig.plot(x_name="samx", y_name="bpm4i")
im = fig.image("eiger")
@@ -78,8 +78,8 @@ def test_rpc_plotting_shortcuts_init_configs(rpc_server_figure, qtbot):
}
def test_rpc_waveform_scan(rpc_server_figure, bec_client_lib):
fig = BECFigure(rpc_server_figure)
def test_rpc_waveform_scan(connected_client_figure, bec_client_lib):
fig = BECFigure(connected_client_figure)
# add 3 different curves to track
plt = fig.plot(x_name="samx", y_name="bpm4i")
@@ -109,8 +109,8 @@ def test_rpc_waveform_scan(rpc_server_figure, bec_client_lib):
assert plt_data["bpm4d-bpm4d"]["y"] == last_scan_data["bpm4d"]["bpm4d"].val
def test_rpc_image(rpc_server_figure, bec_client_lib):
fig = BECFigure(rpc_server_figure)
def test_rpc_image(connected_client_figure, bec_client_lib):
fig = BECFigure(connected_client_figure)
im = fig.image("eiger")
@@ -130,8 +130,8 @@ def test_rpc_image(rpc_server_figure, bec_client_lib):
np.testing.assert_equal(last_image_device, last_image_plot)
def test_rpc_motor_map(rpc_server_figure, bec_client_lib):
fig = BECFigure(rpc_server_figure)
def test_rpc_motor_map(connected_client_figure, bec_client_lib):
fig = BECFigure(connected_client_figure)
motor_map = fig.motor_map("samx", "samy")
@@ -159,9 +159,9 @@ def test_rpc_motor_map(rpc_server_figure, bec_client_lib):
)
def test_dap_rpc(rpc_server_figure, bec_client_lib, qtbot):
def test_dap_rpc(connected_client_figure, bec_client_lib, qtbot):
fig = BECFigure(rpc_server_figure)
fig = BECFigure(connected_client_figure)
plt = fig.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
client = bec_client_lib
@@ -199,8 +199,8 @@ def test_dap_rpc(rpc_server_figure, bec_client_lib, qtbot):
qtbot.waitUntil(wait_for_fit, timeout=10000)
def test_removing_subplots(rpc_server_figure, bec_client_lib):
fig = BECFigure(rpc_server_figure)
def test_removing_subplots(connected_client_figure, bec_client_lib):
fig = BECFigure(connected_client_figure)
plt = fig.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
im = fig.image(monitor="eiger")
mm = fig.motor_map(motor_x="samx", motor_y="samy")

View File

@@ -3,8 +3,8 @@ import pytest
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
def test_rpc_register_list_connections(rpc_server_figure):
fig = BECFigure(rpc_server_figure)
def test_rpc_register_list_connections(connected_client_figure):
fig = BECFigure(connected_client_figure)
plt = fig.plot(x_name="samx", y_name="bpm4i")
im = fig.image("eiger")

View File

@@ -5,7 +5,7 @@ from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
from qtpy.QtCore import QTimer
from qtpy.QtWidgets import QApplication
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.qt_utils import error_popups
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module

View File

@@ -4,7 +4,7 @@ from unittest import mock
import pytest
from bec_widgets.cli.client import BECFigure
from bec_widgets.cli.client_utils import BECGuiClientMixin, _start_plot_process
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
from bec_widgets.tests.utils import FakeDevice
@@ -63,15 +63,14 @@ def test_client_utils_passes_client_config_to_server(bec_dispatcher):
@contextmanager
def bec_client_mixin():
mixin = BECGuiClientMixin()
mixin = BECGuiClient()
mixin._client = bec_dispatcher.client
mixin._gui_id = "gui_id"
mixin.gui_is_alive = mock.MagicMock()
mixin.gui_is_alive.side_effect = [True]
try:
with mock.patch.object(mixin, "_start_update_script"):
yield mixin
yield mixin
finally:
mixin.close()
@@ -82,6 +81,5 @@ def test_client_utils_passes_client_config_to_server(bec_dispatcher):
wait=False
) # the started event will not be set, wait=True would block forever
mock_start_plot.assert_called_once_with(
"gui_id", BECGuiClientMixin, mixin._client._service_config.config, logger=mock.ANY
"gui_id", BECGuiClient, mixin._client._service_config.config, logger=mock.ANY
)
mixin._start_update_script.assert_called_once()

View File

@@ -0,0 +1,308 @@
import pytest
from qtpy.QtCore import QEasingCurve
from qtpy.QtWidgets import QPushButton, QVBoxLayout, QWidget
from bec_widgets.qt_utils.collapsible_panel_manager import (
CollapsiblePanelManager,
DimensionAnimator,
)
from bec_widgets.widgets.containers.layout_manager.layout_manager import LayoutManagerWidget
@pytest.fixture
def reference_widget(qtbot):
widget = QWidget()
layout = QVBoxLayout(widget)
btn = QPushButton("Reference")
layout.addWidget(btn)
qtbot.addWidget(widget)
widget.setVisible(True)
qtbot.waitExposed(widget)
return widget
@pytest.fixture
def layout_manager(qtbot, reference_widget):
manager = LayoutManagerWidget()
qtbot.addWidget(manager)
manager.add_widget(reference_widget, row=0, col=0)
manager.setVisible(True)
qtbot.waitExposed(manager)
return manager
@pytest.fixture
def panel_manager(layout_manager, reference_widget):
manager = CollapsiblePanelManager(layout_manager, reference_widget)
return manager
@pytest.fixture
def test_panel_widget(qtbot):
widget = QWidget()
qtbot.addWidget(widget)
return widget
def test_dimension_animator_width_setting(qtbot, test_panel_widget):
animator = DimensionAnimator(test_panel_widget, "left")
animator.panel_width = 100
assert animator.panel_width == 100
assert test_panel_widget.width() == 100
def test_dimension_animator_height_setting(qtbot, test_panel_widget):
animator = DimensionAnimator(test_panel_widget, "top")
animator.panel_height = 150
assert animator.panel_height == 150
assert test_panel_widget.height() == 150
def test_add_panel(panel_manager, test_panel_widget):
panel_manager.add_panel("left", test_panel_widget, target_size=200)
assert panel_manager.panels["left"]["widget"] == test_panel_widget
# Initially hidden
assert not test_panel_widget.isVisible()
assert test_panel_widget.maximumWidth() == 0
def test_add_panel_no_target_size(panel_manager, test_panel_widget):
panel_manager.add_panel("top", test_panel_widget)
assert panel_manager.panels["top"]["target_size"] == 150
assert not test_panel_widget.isVisible()
def test_add_panel_invalid_direction(panel_manager, test_panel_widget):
with pytest.raises(ValueError) as exc_info:
panel_manager.add_panel("invalid", test_panel_widget)
assert "Direction must be one of 'left', 'right', 'top', 'bottom'." in str(exc_info.value)
def test_toggle_panel_show(panel_manager, test_panel_widget):
panel_manager.add_panel("left", test_panel_widget, target_size=200)
assert not test_panel_widget.isVisible()
panel_manager.toggle_panel("left", animation=False)
assert test_panel_widget.isVisible()
def test_toggle_panel_hide(panel_manager, test_panel_widget):
panel_manager.add_panel("left", test_panel_widget, target_size=200)
panel_manager.toggle_panel("left", animation=False)
assert test_panel_widget.isVisible()
panel_manager.toggle_panel("left", animation=False)
assert not test_panel_widget.isVisible()
def test_toggle_panel_scale(panel_manager, test_panel_widget, reference_widget):
reference_widget.resize(800, 600)
panel_manager.add_panel("right", test_panel_widget)
panel_manager.toggle_panel("right", scale=0.25, animation=False)
assert test_panel_widget.isVisible()
assert test_panel_widget.maximumWidth() == 200
def test_toggle_panel_ensure_max(panel_manager, test_panel_widget):
panel_manager.add_panel("bottom", test_panel_widget, target_size=150)
panel_manager.toggle_panel("bottom", ensure_max=True, animation=False)
assert test_panel_widget.isVisible()
assert test_panel_widget.maximumHeight() == 150
panel_manager.toggle_panel("bottom", ensure_max=True, animation=False)
assert not test_panel_widget.isVisible()
assert test_panel_widget.maximumHeight() == 16777215
def test_toggle_panel_easing_curve(panel_manager, test_panel_widget):
panel_manager.add_panel("top", test_panel_widget, target_size=100, duration=500)
panel_manager.toggle_panel("top", easing_curve=QEasingCurve.OutBounce, animation=True)
assert panel_manager.animations.get(test_panel_widget) is not None
def test_toggle_nonexistent_panel(panel_manager):
with pytest.raises(ValueError) as exc_info:
panel_manager.toggle_panel("invalid")
assert "No panel found in direction 'invalid'." in str(exc_info.value)
def test_toggle_panel_without_animation(panel_manager, test_panel_widget):
panel_manager.add_panel("left", test_panel_widget, target_size=200)
panel_manager.toggle_panel("left", animation=False)
assert test_panel_widget.isVisible()
assert test_panel_widget.maximumWidth() == 200
panel_manager.toggle_panel("left", animation=False)
assert not test_panel_widget.isVisible()
def test_after_hide_reset(panel_manager, test_panel_widget):
panel_manager.add_panel("left", test_panel_widget, target_size=200)
panel_manager.toggle_panel("left", ensure_max=True, animation=False)
assert test_panel_widget.isVisible()
panel_manager.toggle_panel("left", ensure_max=True, animation=False)
assert not test_panel_widget.isVisible()
assert test_panel_widget.minimumWidth() == 0
assert test_panel_widget.maximumWidth() == 0
def test_toggle_panel_repeated(panel_manager, test_panel_widget):
panel_manager.add_panel("right", test_panel_widget, target_size=200)
panel_manager.toggle_panel("right", animation=False)
assert test_panel_widget.isVisible()
panel_manager.toggle_panel("right", animation=False)
assert not test_panel_widget.isVisible()
panel_manager.toggle_panel("right", animation=False)
assert test_panel_widget.isVisible()
def test_toggle_panel_with_custom_duration(panel_manager, test_panel_widget):
panel_manager.add_panel("bottom", test_panel_widget, target_size=150, duration=1000)
panel_manager.toggle_panel("bottom", duration=2000, animation=True)
animation = panel_manager.animations.get(test_panel_widget)
assert animation is not None
assert animation.duration() == 2000
def test_toggle_panel_ensure_max_scale(panel_manager, test_panel_widget, reference_widget):
reference_widget.resize(1000, 800)
panel_manager.add_panel("top", test_panel_widget)
panel_manager.toggle_panel("top", ensure_max=True, scale=0.5, animation=False)
assert test_panel_widget.isVisible()
assert test_panel_widget.maximumHeight() == 400
def test_no_animation_mode(panel_manager, test_panel_widget):
panel_manager.add_panel("left", test_panel_widget, target_size=200)
panel_manager.toggle_panel("left", animation=False)
assert test_panel_widget.isVisible()
panel_manager.toggle_panel("left", animation=False)
assert not test_panel_widget.isVisible()
def test_toggle_panel_nondefault_easing(panel_manager, test_panel_widget):
panel_manager.add_panel("right", test_panel_widget, target_size=200)
panel_manager.toggle_panel("right", easing_curve=QEasingCurve.InCurve, animation=True)
animation = panel_manager.animations.get(test_panel_widget)
assert animation is not None
assert animation.easingCurve() == QEasingCurve.InCurve
def test_toggle_panel_ensure_max_no_animation(panel_manager, test_panel_widget):
panel_manager.add_panel("bottom", test_panel_widget, target_size=150)
panel_manager.toggle_panel("bottom", ensure_max=True, animation=False)
assert test_panel_widget.isVisible()
assert test_panel_widget.maximumHeight() == 150
panel_manager.toggle_panel("bottom", ensure_max=True, animation=False)
assert not test_panel_widget.isVisible()
assert test_panel_widget.maximumHeight() == 16777215
def test_toggle_panel_new_target_size(panel_manager, test_panel_widget):
panel_manager.add_panel("left", test_panel_widget, target_size=200)
panel_manager.toggle_panel("left", target_size=300, animation=False)
assert test_panel_widget.isVisible()
assert test_panel_widget.maximumWidth() == 300
panel_manager.toggle_panel("left", animation=False)
assert not test_panel_widget.isVisible()
def test_toggle_panel_new_duration(panel_manager, test_panel_widget):
panel_manager.add_panel("left", test_panel_widget, target_size=200, duration=300)
panel_manager.toggle_panel("left", duration=1000, animation=True)
animation = panel_manager.animations.get(test_panel_widget)
assert animation.duration() == 1000
def test_toggle_panel_wrong_direction(panel_manager):
with pytest.raises(ValueError) as exc:
panel_manager.toggle_panel("unknown_direction")
assert "No panel found in direction 'unknown_direction'." in str(exc.value)
def test_toggle_panel_no_panels(panel_manager):
with pytest.raises(ValueError) as exc:
panel_manager.toggle_panel("top")
assert "No panel found in direction 'top'." in str(exc.value)
def test_multiple_panels_interaction(panel_manager):
widget_left = QWidget()
widget_right = QWidget()
panel_manager.add_panel("left", widget_left, target_size=200)
panel_manager.add_panel("right", widget_right, target_size=300)
panel_manager.toggle_panel("left", animation=False)
assert widget_left.isVisible()
panel_manager.toggle_panel("right", animation=False)
assert widget_right.isVisible()
panel_manager.toggle_panel("left", animation=False)
assert not widget_left.isVisible()
assert widget_right.isVisible()
panel_manager.toggle_panel("right", animation=False)
assert not widget_right.isVisible()
def test_panel_manager_custom_easing(panel_manager, test_panel_widget):
panel_manager.add_panel("top", test_panel_widget, target_size=150)
panel_manager.toggle_panel("top", easing_curve=QEasingCurve.InQuad, animation=True)
animation = panel_manager.animations.get(test_panel_widget)
assert animation is not None
assert animation.easingCurve() == QEasingCurve.InQuad
def test_toggle_panel_scale_no_animation(panel_manager, test_panel_widget, reference_widget):
reference_widget.resize(400, 300)
panel_manager.add_panel("bottom", test_panel_widget)
panel_manager.toggle_panel("bottom", scale=0.5, animation=False)
assert test_panel_widget.isVisible()
assert test_panel_widget.maximumHeight() == 150
panel_manager.toggle_panel("bottom", animation=False)
assert not test_panel_widget.isVisible()
def test_after_hide_reset_properties(panel_manager, test_panel_widget):
panel_manager.add_panel("left", test_panel_widget, target_size=200)
panel_manager.toggle_panel("left", ensure_max=True, animation=False)
panel_manager.toggle_panel("left", ensure_max=True, animation=False)
assert not test_panel_widget.isVisible()
assert test_panel_widget.minimumWidth() == 0
assert test_panel_widget.maximumWidth() == 0
def test_toggle_panel_no_animation_show_only(panel_manager, test_panel_widget):
panel_manager.add_panel("right", test_panel_widget, target_size=100)
panel_manager.toggle_panel("right", animation=False)
assert test_panel_widget.isVisible()
assert test_panel_widget.maximumWidth() == 100
def test_toggle_panel_no_animation_hide_only(panel_manager, test_panel_widget):
panel_manager.add_panel("left", test_panel_widget, target_size=100)
panel_manager.toggle_panel("left", animation=False)
assert test_panel_widget.isVisible()
panel_manager.toggle_panel("left", animation=False)
assert not test_panel_widget.isVisible()
def test_toggle_panel_easing_inout(panel_manager, test_panel_widget):
panel_manager.add_panel("top", test_panel_widget, target_size=120)
panel_manager.toggle_panel("top", easing_curve=QEasingCurve.InOutQuad, animation=True)
animation = panel_manager.animations.get(test_panel_widget)
assert animation is not None
assert animation.easingCurve() == QEasingCurve.InOutQuad
def test_toggle_panel_ensure_max_width(panel_manager, test_panel_widget):
panel_manager.add_panel("right", test_panel_widget, target_size=200)
panel_manager.toggle_panel("right", ensure_max=True, animation=False)
assert test_panel_widget.isVisible()
assert test_panel_widget.maximumWidth() == 200
def test_toggle_panel_invalid_direction_twice(panel_manager, test_panel_widget):
panel_manager.add_panel("left", test_panel_widget, target_size=200)
with pytest.raises(ValueError) as exc_info:
panel_manager.toggle_panel("invalid_direction")
assert "No panel found in direction 'invalid_direction'." in str(exc_info.value)

View File

@@ -70,7 +70,7 @@ def test_client_generator_with_black_formatting():
import enum
from typing import Literal, Optional, overload
from bec_widgets.cli.client_utils import BECGuiClientMixin, RPCBase, rpc_call
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
# pylint: skip-file

View File

@@ -0,0 +1,368 @@
from typing import Optional
from unittest.mock import patch
import pytest
from qtpy.QtWidgets import QLabel, QPushButton, QWidget
from bec_widgets.widgets.containers.layout_manager.layout_manager import LayoutManagerWidget
class MockWidgetHandler:
def create_widget(self, widget_type: str) -> Optional[QWidget]:
if widget_type == "ButtonWidget":
return QPushButton()
elif widget_type == "LabelWidget":
return QLabel()
else:
return None
@pytest.fixture
def mock_widget_handler():
handler = MockWidgetHandler()
with patch(
"bec_widgets.widgets.containers.layout_manager.layout_manager.widget_handler", handler
):
yield handler
@pytest.fixture
def layout_manager(qtbot, mock_widget_handler):
widget = LayoutManagerWidget()
qtbot.addWidget(widget)
qtbot.waitExposed(widget)
yield widget
def test_add_widget_empty_position(layout_manager):
"""Test adding a widget to an empty position without shifting."""
btn1 = QPushButton("Button 1")
layout_manager.add_widget(btn1, row=0, col=0)
assert layout_manager.get_widget(0, 0) == btn1
assert layout_manager.widget_positions[btn1] == (0, 0, 1, 1)
assert layout_manager.position_widgets[(0, 0)] == btn1
def test_add_widget_occupied_position(layout_manager):
"""Test adding a widget to an occupied position with shifting (default direction right)."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
layout_manager.add_widget(btn1, row=0, col=0)
layout_manager.add_widget(btn2, row=0, col=0) # This should shift btn1 to the right
assert layout_manager.get_widget(0, 0) == btn2
assert layout_manager.get_widget(0, 1) == btn1
assert layout_manager.widget_positions[btn2] == (0, 0, 1, 1)
assert layout_manager.widget_positions[btn1] == (0, 1, 1, 1)
def test_add_widget_directional_shift_down(layout_manager):
"""Test adding a widget to an occupied position but shifting down instead of right."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
btn3 = QPushButton("Button 3")
layout_manager.add_widget(btn1, row=0, col=0)
layout_manager.add_widget(btn2, row=0, col=0) # Shifts btn1 to the right by default
# Now add btn3 at (0,1) but shift direction is down, so it should push btn1 down.
layout_manager.add_widget(btn3, row=0, col=1, shift_direction="down")
assert layout_manager.get_widget(0, 0) == btn2
assert layout_manager.get_widget(0, 1) == btn3
assert layout_manager.get_widget(1, 1) == btn1
def test_remove_widget_by_position(layout_manager):
"""Test removing a widget by specifying its row and column."""
btn1 = QPushButton("Button 1")
layout_manager.add_widget(btn1, row=0, col=0)
layout_manager.remove(row=0, col=0)
assert layout_manager.get_widget(0, 0) is None
assert btn1 not in layout_manager.widget_positions
def test_move_widget_with_shift(layout_manager):
"""Test moving a widget to an occupied position, triggering a shift."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
btn3 = QPushButton("Button 3")
layout_manager.add_widget(btn1, row=0, col=0)
layout_manager.add_widget(btn2, row=0, col=1)
layout_manager.add_widget(btn3, row=1, col=0)
layout_manager.move_widget(old_row=0, old_col=0, new_row=0, new_col=1, shift_direction="right")
assert layout_manager.get_widget(0, 1) == btn1
assert layout_manager.get_widget(0, 2) == btn2
assert layout_manager.get_widget(1, 0) == btn3
def test_move_widget_without_shift(layout_manager):
"""Test moving a widget to an occupied position without shifting."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
layout_manager.add_widget(btn1, row=0, col=0)
layout_manager.add_widget(btn2, row=0, col=1)
with pytest.raises(ValueError) as exc_info:
layout_manager.move_widget(old_row=0, old_col=0, new_row=0, new_col=1, shift=False)
assert "Position (0, 1) is already occupied." in str(exc_info.value)
def test_change_layout_num_cols(layout_manager):
"""Test changing the layout by specifying only the number of columns."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
btn3 = QPushButton("Button 3")
btn4 = QPushButton("Button 4")
layout_manager.add_widget(btn1)
layout_manager.add_widget(btn2)
layout_manager.add_widget(btn3)
layout_manager.add_widget(btn4)
layout_manager.change_layout(num_cols=2)
assert layout_manager.get_widget(0, 0) == btn1
assert layout_manager.get_widget(0, 1) == btn2
assert layout_manager.get_widget(1, 0) == btn3
assert layout_manager.get_widget(1, 1) == btn4
def test_change_layout_num_rows(layout_manager):
"""Test changing the layout by specifying only the number of rows."""
btn_list = [QPushButton(f"Button {i}") for i in range(1, 7)]
for btn in btn_list:
layout_manager.add_widget(btn)
layout_manager.change_layout(num_rows=3)
assert layout_manager.get_widget(0, 0) == btn_list[0]
assert layout_manager.get_widget(0, 1) == btn_list[1]
assert layout_manager.get_widget(1, 0) == btn_list[2]
assert layout_manager.get_widget(1, 1) == btn_list[3]
assert layout_manager.get_widget(2, 0) == btn_list[4]
assert layout_manager.get_widget(2, 1) == btn_list[5]
def test_shift_all_widgets(layout_manager):
"""Test shifting all widgets down and then up."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
layout_manager.add_widget(btn1, row=0, col=0)
layout_manager.add_widget(btn2, row=0, col=1)
# Shift all down
layout_manager.shift_all_widgets(direction="down")
assert layout_manager.get_widget(1, 0) == btn1
assert layout_manager.get_widget(1, 1) == btn2
# Shift all up
layout_manager.shift_all_widgets(direction="up")
assert layout_manager.get_widget(0, 0) == btn1
assert layout_manager.get_widget(0, 1) == btn2
def test_add_widget_auto_position(layout_manager):
"""Test adding widgets without specifying row and column."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
layout_manager.add_widget(btn1)
layout_manager.add_widget(btn2)
assert layout_manager.get_widget(0, 0) == btn1
assert layout_manager.get_widget(0, 1) == btn2
def test_clear_layout(layout_manager):
"""Test clearing the entire layout."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
layout_manager.add_widget(btn1)
layout_manager.add_widget(btn2)
layout_manager.clear_layout()
assert layout_manager.get_widget(0, 0) is None
assert layout_manager.get_widget(0, 1) is None
assert len(layout_manager.widget_positions) == 0
def test_add_widget_with_span(layout_manager):
"""Test adding a widget with rowspan and colspan."""
btn1 = QPushButton("Button 1")
layout_manager.add_widget(btn1, row=0, col=0, rowspan=2, colspan=2)
assert layout_manager.widget_positions[btn1] == (0, 0, 2, 2)
def test_add_widget_overlap_with_span(layout_manager):
"""
Test adding a widget that overlaps with an existing widget's span.
The code will attempt to shift widgets accordingly.
"""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
layout_manager.add_widget(btn1, row=0, col=0, rowspan=2, colspan=1)
layout_manager.add_widget(btn2, row=1, col=1, shift_direction="right")
assert layout_manager.get_widget(0, 0) == btn1
assert layout_manager.widget_positions[btn1] == (0, 0, 2, 1)
assert layout_manager.get_widget(1, 1) == btn2
assert layout_manager.widget_positions[btn2] == (1, 1, 1, 1)
@pytest.mark.parametrize(
"position, btn3_coords",
[("left", (1, 0)), ("right", (1, 2)), ("top", (0, 1)), ("bottom", (2, 1))],
)
def test_add_widget_relative(layout_manager, position, btn3_coords):
"""Test adding a widget relative to an existing widget using parameterized data."""
expected_row, expected_col = btn3_coords
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
btn3 = QPushButton("Button 3")
layout_manager.add_widget(btn1, row=0, col=0)
layout_manager.add_widget(btn2, row=1, col=1)
layout_manager.add_widget_relative(btn3, reference_widget=btn2, position=position)
assert layout_manager.get_widget(0, 0) == btn1
assert layout_manager.get_widget(1, 1) == btn2
assert layout_manager.get_widget(expected_row, expected_col) == btn3
def test_add_widget_relative_invalid_position(layout_manager):
"""Test adding a widget relative to an existing widget with an invalid position."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
layout_manager.add_widget(btn1, row=1, col=1)
with pytest.raises(ValueError) as exc_info:
layout_manager.add_widget_relative(btn2, reference_widget=btn1, position="invalid_position")
assert "Invalid position. Choose from 'left', 'right', 'top', 'bottom'." in str(exc_info.value)
btn2.deleteLater()
def test_add_widget_relative_to_nonexistent_widget(layout_manager):
"""Test adding a widget relative to a widget that does not exist in the layout."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
with pytest.raises(ValueError) as exc_info:
layout_manager.add_widget_relative(btn2, reference_widget=btn1, position="left")
assert "Reference widget not found in layout." in str(exc_info.value)
btn1.deleteLater()
btn2.deleteLater()
def test_add_widget_relative_with_shift(layout_manager):
"""Test adding a widget relative to an existing widget with shifting."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
btn3 = QPushButton("Button 3")
layout_manager.add_widget(btn1, row=1, col=1)
layout_manager.add_widget(btn2, row=1, col=0)
layout_manager.add_widget_relative(
btn3, reference_widget=btn1, position="left", shift_direction="right"
)
assert layout_manager.get_widget(0, 0) == btn3
assert layout_manager.get_widget(1, 1) == btn2
assert layout_manager.get_widget(0, 1) == btn1
def test_move_widget_by_object(layout_manager):
"""Test moving a widget using the widget object."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
layout_manager.add_widget(btn1)
layout_manager.add_widget(btn2, row=0, col=1)
layout_manager.move_widget_by_object(btn1, new_row=1, new_col=1)
# the grid is reindex after each move, so the new positions are (0,0) and (1,0), because visually there is only one column
assert layout_manager.get_widget(1, 0) == btn1
assert layout_manager.get_widget(0, 0) == btn2
def test_move_widget_by_coords(layout_manager):
"""Test moving a widget using its current coordinates."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
layout_manager.add_widget(btn1)
layout_manager.add_widget(btn2, row=0, col=1)
layout_manager.move_widget_by_coords(0, 0, 1, 0, shift_direction="down")
assert layout_manager.get_widget(1, 0) == btn1
assert layout_manager.get_widget(0, 1) == btn2
def test_change_layout_no_arguments(layout_manager):
"""Test changing the layout with no arguments (should do nothing)."""
btn1 = QPushButton("Button 1")
layout_manager.add_widget(btn1, row=0, col=0)
layout_manager.change_layout()
assert layout_manager.get_widget(0, 0) == btn1
assert len(layout_manager.widget_positions) == 1
def test_remove_nonexistent_widget(layout_manager):
"""Test removing a widget that doesn't exist in the layout."""
with pytest.raises(ValueError) as exc_info:
layout_manager.remove(row=0, col=0)
assert "No widget found at position (0, 0)." in str(exc_info.value)
def test_reindex_grid_after_removal(layout_manager):
"""Test reindexing the grid after removing a widget."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
layout_manager.add_widget(btn1)
layout_manager.add_widget(btn2, row=0, col=1)
layout_manager.remove_widget(btn1)
layout_manager.reindex_grid()
# After removal and reindex, btn2 should shift to (0,0)
assert layout_manager.get_widget(0, 0) == btn2
assert layout_manager.widget_positions[btn2] == (0, 0, 1, 1)
def test_shift_all_widgets_up_at_top_row(layout_manager):
"""Test shifting all widgets up when they are already at the top row."""
btn1 = QPushButton("Button 1")
btn2 = QPushButton("Button 2")
layout_manager.add_widget(btn1, row=0, col=0)
layout_manager.add_widget(btn2, row=0, col=1)
# Shifting up should cause an error since widgets can't move above row 0
with pytest.raises(ValueError) as exc_info:
layout_manager.shift_all_widgets(direction="up")
assert "Shifting widgets out of grid boundaries." in str(exc_info.value)

View File

@@ -1,4 +1,4 @@
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.cli.rpc.rpc_register import RPCRegister
class FakeObject:

View File

@@ -1,4 +1,4 @@
from bec_widgets.cli.rpc_wigdet_handler import RPCWidgetHandler
from bec_widgets.cli.rpc.rpc_widget_handler import RPCWidgetHandler
def test_rpc_widget_handler():

View File

@@ -1,8 +1,17 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import pytest
from qtpy.QtWidgets import QComboBox, QLineEdit, QSpinBox, QTableWidget, QVBoxLayout, QWidget
from qtpy.QtCore import Qt
from qtpy.QtWidgets import (
QComboBox,
QLineEdit,
QSpinBox,
QTableWidget,
QTableWidgetItem,
QVBoxLayout,
QWidget,
)
from bec_widgets.utils.widget_io import WidgetHierarchy
from bec_widgets.utils.widget_io import WidgetHierarchy, WidgetIO
@pytest.fixture(scope="function")
@@ -22,6 +31,12 @@ def example_widget(qtbot):
# Add text items to the combo box
combo_box.addItems(["Option 1", "Option 2", "Option 3"])
# Populate the table widget
table_widget.setItem(0, 0, QTableWidgetItem("Initial A"))
table_widget.setItem(0, 1, QTableWidgetItem("Initial B"))
table_widget.setItem(1, 0, QTableWidgetItem("Initial C"))
table_widget.setItem(1, 1, QTableWidgetItem("Initial D"))
qtbot.addWidget(main_widget)
qtbot.waitExposed(main_widget)
yield main_widget
@@ -88,3 +103,73 @@ def test_export_import_config(example_widget):
assert exported_config_full == expected_full
assert exported_config_reduced == expected_reduced
def test_widget_io_get_set_value(example_widget):
# Extract widgets
line_edit = example_widget.findChild(QLineEdit)
combo_box = example_widget.findChild(QComboBox)
table_widget = example_widget.findChild(QTableWidget)
spin_box = example_widget.findChild(QSpinBox)
# Check initial values
assert WidgetIO.get_value(line_edit) == ""
assert WidgetIO.get_value(combo_box) == 0 # first index
assert WidgetIO.get_value(table_widget) == [
["Initial A", "Initial B"],
["Initial C", "Initial D"],
]
assert WidgetIO.get_value(spin_box) == 0
# Set new values
WidgetIO.set_value(line_edit, "Hello")
WidgetIO.set_value(combo_box, "Option 2")
WidgetIO.set_value(table_widget, [["X", "Y"], ["Z", "W"]])
WidgetIO.set_value(spin_box, 5)
# Check updated values
assert WidgetIO.get_value(line_edit) == "Hello"
assert WidgetIO.get_value(combo_box, as_string=True) == "Option 2"
assert WidgetIO.get_value(table_widget) == [["X", "Y"], ["Z", "W"]]
assert WidgetIO.get_value(spin_box) == 5
def test_widget_io_signal(qtbot, example_widget):
# Extract widgets
line_edit = example_widget.findChild(QLineEdit)
combo_box = example_widget.findChild(QComboBox)
spin_box = example_widget.findChild(QSpinBox)
table_widget = example_widget.findChild(QTableWidget)
# We'll store changes in a list to verify the slot is called
changes = []
def universal_slot(w, val):
changes.append((w, val))
# Connect signals
WidgetIO.connect_widget_change_signal(line_edit, universal_slot)
WidgetIO.connect_widget_change_signal(combo_box, universal_slot)
WidgetIO.connect_widget_change_signal(spin_box, universal_slot)
WidgetIO.connect_widget_change_signal(table_widget, universal_slot)
# Trigger changes
line_edit.setText("NewText")
qtbot.waitUntil(lambda: len(changes) > 0)
assert changes[-1][1] == "NewText"
combo_box.setCurrentIndex(2)
qtbot.waitUntil(lambda: len(changes) > 1)
# combo_box change should give the current index or value
# We set "Option 3" is index 2
assert changes[-1][1] == 2 or changes[-1][1] == "Option 3"
spin_box.setValue(42)
qtbot.waitUntil(lambda: len(changes) > 2)
assert changes[-1][1] == 42
# For the table widget, changing a cell triggers cellChanged
table_widget.setItem(0, 0, QTableWidgetItem("ChangedCell"))
qtbot.waitUntil(lambda: len(changes) > 3)
# The entire table value should be retrieved
assert changes[-1][1][0][0] == "ChangedCell"