1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-11 19:20:53 +02:00

Compare commits

...

2 Commits

11 changed files with 1865 additions and 158 deletions

View File

@@ -0,0 +1,67 @@
from qtpy import QtCore, QtWidgets
from bec_widgets.examples.device_manager_view.device_manager_view import DeviceManagerView
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
class BECMainApp(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
# Main layout
layout = QtWidgets.QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
# Tab widget as central area
self.tabs = QtWidgets.QTabWidget(self)
self.tabs.setContentsMargins(0, 0, 0, 0)
self.tabs.setTabPosition(QtWidgets.QTabWidget.West) # Tabs on the left side
layout.addWidget(self.tabs)
# Add DM
self._add_device_manager_view()
# Add Plot area
self._add_ad_dockarea()
# Adjust size of tab bar
# TODO not yet properly working, tabs a spread across the full length, to be checked!
tab_bar = self.tabs.tabBar()
tab_bar.setFixedWidth(tab_bar.sizeHint().width())
def _add_device_manager_view(self) -> None:
self.device_manager_view = DeviceManagerView(parent=self)
self.add_tab(self.device_manager_view, "Device Manager")
def _add_ad_dockarea(self) -> None:
self.advanced_dock_area = AdvancedDockArea(parent=self)
self.add_tab(self.advanced_dock_area, "Plot Area")
def add_tab(self, widget: QtWidgets.QWidget, title: str):
"""Add a custom QWidget as a tab."""
tab_container = QtWidgets.QWidget()
tab_layout = QtWidgets.QVBoxLayout(tab_container)
tab_layout.setContentsMargins(0, 0, 0, 0)
tab_layout.setSpacing(0)
tab_layout.addWidget(widget)
self.tabs.addTab(tab_container, title)
if __name__ == "__main__":
import sys
from bec_lib.bec_yaml_loader import yaml_load
from bec_qthemes import apply_theme
app = QtWidgets.QApplication(sys.argv)
apply_theme("light")
win = BECMainApp()
config_path = "/Users/appel_c/work_psi_awi/bec_workspace/csaxs_bec/csaxs_bec/device_configs/first_light.yaml"
cfg = yaml_load(config_path)
cfg.update({"device_will_fail": {"name": "device_will_fail", "some_param": 1}})
win.device_manager_view.device_table_view.set_device_config(cfg)
win.resize(1920, 1080)
win.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1,488 @@
from __future__ import annotations
import os
from typing import TYPE_CHECKING, List
import PySide6QtAds as QtAds
import yaml
from bec_lib.bec_yaml_loader import yaml_load
from bec_lib.file_utils import DeviceConfigWriter
from bec_lib.logger import bec_logger
from bec_lib.plugin_helper import plugin_package_name, plugin_repo_path
from bec_qthemes import apply_theme
from PySide6QtAds import CDockManager, CDockWidget
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import QFileDialog, QMessageBox, QSplitter, QVBoxLayout, QWidget
from bec_widgets import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.control.device_manager.components import (
DeviceTableView,
DMConfigView,
DMOphydTest,
DocstringView,
)
if TYPE_CHECKING:
from bec_lib.client import BECClient
logger = bec_logger.logger
def set_splitter_weights(splitter: QSplitter, weights: List[float]) -> None:
"""
Apply initial sizes to a splitter using weight ratios, e.g. [1,3,2,1].
Works for horizontal or vertical splitters and sets matching stretch factors.
"""
def apply():
n = splitter.count()
if n == 0:
return
w = list(weights[:n]) + [1] * max(0, n - len(weights))
w = [max(0.0, float(x)) for x in w]
tot_w = sum(w)
if tot_w <= 0:
w = [1.0] * n
tot_w = float(n)
total_px = (
splitter.width() if splitter.orientation() == Qt.Horizontal else splitter.height()
)
if total_px < 2:
QTimer.singleShot(0, apply)
return
sizes = [max(1, int(total_px * (wi / tot_w))) for wi in w]
diff = total_px - sum(sizes)
if diff != 0:
idx = max(range(n), key=lambda i: w[i])
sizes[idx] = max(1, sizes[idx] + diff)
splitter.setSizes(sizes)
for i, wi in enumerate(w):
splitter.setStretchFactor(i, max(1, int(round(wi * 100))))
QTimer.singleShot(0, apply)
class DeviceManagerView(BECWidget, QWidget):
def __init__(self, parent=None, *args, **kwargs):
super().__init__(parent=parent, client=None, *args, **kwargs)
# Top-level layout hosting a toolbar and the dock manager
self._root_layout = QVBoxLayout(self)
self._root_layout.setContentsMargins(0, 0, 0, 0)
self._root_layout.setSpacing(0)
self.dock_manager = CDockManager(self)
self._root_layout.addWidget(self.dock_manager)
# Available Resources Widget
self.available_devices = QWidget(self)
self.available_devices_dock = QtAds.CDockWidget("Available Devices", self)
self.available_devices_dock.setWidget(self.available_devices)
# Device Table View widget
self.device_table_view = DeviceTableView(self)
self.device_table_view_dock = QtAds.CDockWidget("Device Table", self)
self.device_table_view_dock.setWidget(self.device_table_view)
# Device Config View widget
self.dm_config_view = DMConfigView(self)
self.dm_config_view_dock = QtAds.CDockWidget("Device Config View", self)
self.dm_config_view_dock.setWidget(self.dm_config_view)
# Docstring View
self.dm_docs_view = DocstringView(self)
self.dm_docs_view_dock = QtAds.CDockWidget("Docstring View", self)
self.dm_docs_view_dock.setWidget(self.dm_docs_view)
# Ophyd Test view
self.ophyd_test_view = DMOphydTest(self)
self.ophyd_test_dock_view = QtAds.CDockWidget("Ophyd Test View", self)
self.ophyd_test_dock_view.setWidget(self.ophyd_test_view)
# Arrange widgets within the QtAds dock manager
# Central widget area
self.central_dock_area = self.dock_manager.setCentralWidget(self.device_table_view_dock)
self.dock_manager.addDockWidget(
QtAds.DockWidgetArea.BottomDockWidgetArea,
self.dm_docs_view_dock,
self.central_dock_area,
)
# Left Area
self.left_dock_area = self.dock_manager.addDockWidget(
QtAds.DockWidgetArea.LeftDockWidgetArea, self.available_devices_dock
)
self.dock_manager.addDockWidget(
QtAds.DockWidgetArea.BottomDockWidgetArea, self.dm_config_view_dock, self.left_dock_area
)
# Right area
self.dock_manager.addDockWidget(
QtAds.DockWidgetArea.RightDockWidgetArea, self.ophyd_test_dock_view
)
for dock in self.dock_manager.dockWidgets():
# dock.setFeature(CDockWidget.DockWidgetDeleteOnClose, True)#TODO implement according to MonacoDock or AdvancedDockArea
# dock.setFeature(CDockWidget.CustomCloseHandling, True) #TODO same
dock.setFeature(CDockWidget.DockWidgetClosable, False)
dock.setFeature(CDockWidget.DockWidgetFloatable, False)
dock.setFeature(CDockWidget.DockWidgetMovable, False)
# Fetch all dock areas of the dock widgets (on our case always one dock area)
for dock in self.dock_manager.dockWidgets():
area = dock.dockAreaWidget()
area.titleBar().setVisible(False)
# Apply stretch after the layout is done
self.set_default_view([2, 8, 2], [3, 1])
# self.set_default_view([2, 8, 2], [2, 2, 4])
# Connect slots
self.device_table_view.selected_device.connect(self.dm_config_view.on_select_config)
self.device_table_view.selected_device.connect(self.dm_docs_view.on_select_config)
self.ophyd_test_view.device_validated.connect(
self.device_table_view.update_device_validation
)
self.device_table_view.device_configs_added.connect(self.ophyd_test_view.add_device_configs)
self._add_toolbar()
def _add_toolbar(self):
self.toolbar = ModularToolBar(self)
# Add IO actions
self._add_io_actions()
self._add_table_actions()
self.toolbar.show_bundles(["IO", "Table"])
self._root_layout.insertWidget(0, self.toolbar)
def _add_io_actions(self):
# Create IO bundle
io_bundle = ToolbarBundle("IO", self.toolbar.components)
# Add load config from plugin dir
self.toolbar.add_bundle(io_bundle)
load = MaterialIconAction(
icon_name="file_open", parent=self, tooltip="Load configuration file from disk"
)
self.toolbar.components.add_safe("load", load)
load.action.triggered.connect(self._load_file_action)
io_bundle.add_action("load")
# Add safe to disk
safe_to_disk = MaterialIconAction(
icon_name="file_save", parent=self, tooltip="Save config to disk"
)
self.toolbar.components.add_safe("safe_to_disk", safe_to_disk)
safe_to_disk.action.triggered.connect(self._safe_to_disk_action)
io_bundle.add_action("safe_to_disk")
# Add load config from redis
load_redis = MaterialIconAction(
icon_name="cached", parent=self, tooltip="Load current config from Redis"
)
load_redis.action.triggered.connect(self._load_redis_action)
self.toolbar.components.add_safe("load_redis", load_redis)
io_bundle.add_action("load_redis")
# Update config action
update_config_redis = MaterialIconAction(
icon_name="cloud_upload", parent=self, tooltip="Update current config in Redis"
)
update_config_redis.action.triggered.connect(self._update_redis_action)
self.toolbar.components.add_safe("update_config_redis", update_config_redis)
io_bundle.add_action("update_config_redis")
# Table actions
def _add_table_actions(self) -> None:
table_bundle = ToolbarBundle("Table", self.toolbar.components)
# Add load config from plugin dir
self.toolbar.add_bundle(table_bundle)
# Reset composed view
reset_composed = MaterialIconAction(
icon_name="delete_sweep", parent=self, tooltip="Reset current composed config view"
)
reset_composed.action.triggered.connect(self._reset_composed_view)
self.toolbar.components.add_safe("reset_composed", reset_composed)
table_bundle.add_action("reset_composed")
# Add device
add_device = MaterialIconAction(icon_name="add", parent=self, tooltip="Add new device")
add_device.action.triggered.connect(self._add_device_action)
self.toolbar.components.add_safe("add_device", add_device)
table_bundle.add_action("add_device")
# Remove device
remove_device = MaterialIconAction(icon_name="remove", parent=self, tooltip="Remove device")
remove_device.action.triggered.connect(self._remove_device_action)
self.toolbar.components.add_safe("remove_device", remove_device)
table_bundle.add_action("remove_device")
# Rerun validation
rerun_validation = MaterialIconAction(
icon_name="checklist", parent=self, tooltip="Run device validation on selected devices"
)
rerun_validation.action.triggered.connect(self._rerun_validation_action)
self.toolbar.components.add_safe("rerun_validation", rerun_validation)
table_bundle.add_action("rerun_validation")
# Most likly, no actions on available devices
# Actions (vielleicht bundle fuer available devices )
# - reset composed view
# - add new device (EpicsMotor, EpicsMotorECMC, EpicsSignal, CustomDevice)
# - remove device
# - rerun validation (with/without connect)
# IO actions
@SafeSlot()
def _load_file_action(self):
"""Action for the 'load' action to load a config from disk for the io_bundle of the toolbar."""
# Check if plugin repo is installed...
try:
plugin_path = plugin_repo_path()
plugin_name = plugin_package_name()
config_path = os.path.join(plugin_path, plugin_name, "device_configs")
except ValueError:
# Get the recovery config path as fallback
config_path = self._get_recovery_config_path()
logger.warning(
f"No plugin repository installed, fallback to recovery config path: {config_path}"
)
# Implement the file loading logic here
start_dir = os.path.abspath(config_path)
file_path, _ = QFileDialog.getOpenFileName(
self, caption="Select Config File", dir=start_dir
)
if file_path:
try:
config = yaml_load(file_path)
except Exception as e:
logger.error(f"Failed to load config from file {file_path}. Error: {e}")
return
self.device_table_view.set_device_config(
config
) # TODO ADD QDialog with 'replace', 'add' & 'cancel'
# TODO would we ever like to add the current config to an existing composition
@SafeSlot()
def _load_redis_action(self):
"""Action for the 'load_redis' action to load the current config from Redis for the io_bundle of the toolbar."""
reply = QMessageBox.question(
self,
"Load currently active config",
"Do you really want to flush the current config and reload?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No,
)
if reply == QMessageBox.Yes:
cfg = {}
config_list = self.client.device_manager._get_redis_device_config()
for item in config_list:
k = item["name"]
item.pop("name")
cfg[k] = item
self.device_table_view.set_device_config(cfg)
else:
return
@SafeSlot()
def _safe_to_disk_action(self):
"""Action for the 'safe_to_disk' action to save the current config to disk."""
# Check if plugin repo is installed...
try:
config_path = self._get_recovery_config_path()
except ValueError:
# Get the recovery config path as fallback
config_path = os.path.abspath(os.path.expanduser("~"))
logger.warning(f"Failed to find recovery config path, fallback to: {config_path}")
# Implement the file loading logic here
file_path, _ = QFileDialog.getSaveFileName(
self, caption="Save Config File", dir=config_path
)
if file_path:
config = self.device_table_view.get_device_config()
with open(file_path, "w") as file:
file.write(yaml.dump(config))
# TODO add here logic, should be asyncronous, but probably block UI, and show a loading spinner. If failed, it should report..
@SafeSlot()
def _update_redis_action(self):
"""Action for the 'update_redis' action to update the current config in Redis."""
config = self.device_table_view.get_device_config()
reply = QMessageBox.question(
self,
"Not implemented yet",
"This feature has not been implemented yet, will be coming soon...!!",
QMessageBox.Cancel,
QMessageBox.Cancel,
)
# Table actions
@SafeSlot()
def _reset_composed_view(self):
"""Action for the 'reset_composed_view' action to reset the composed view."""
reply = QMessageBox.question(
self,
"Clear View",
"You are about to clear the current composed config view, please confirm...",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No,
)
if reply == QMessageBox.Yes:
self.device_table_view.clear_device_configs()
# TODO Here we would like to implement a custom popup view, that allows to add new devices
# We want to have a combobox to choose from EpicsMotor, EpicsMotorECMC, EpicsSignal, EpicsSignalRO, and maybe EpicsSignalWithRBV and custom Device
# For all default Epics devices, we would like to preselect relevant fields, and prompt them with the proper deviceConfig args already, i.e. 'prefix', 'read_pv', 'write_pv' etc..
# For custom Device, they should receive all options. It might be cool to get a side panel with docstring view of the class upon inspecting it to make it easier in case deviceConfig entries are required..
@SafeSlot()
def _add_device_action(self):
"""Action for the 'add_device' action to add a new device."""
# Implement the logic to add a new device
reply = QMessageBox.question(
self,
"Not implemented yet",
"This feature has not been implemented yet, will be coming soon...!!",
QMessageBox.Cancel,
QMessageBox.Cancel,
)
# TODO fix the device table remove actions. This is currently not working properly...
@SafeSlot()
def _remove_device_action(self):
"""Action for the 'remove_device' action to remove a device."""
reply = QMessageBox.question(
self,
"Not implemented yet",
"This feature has not been implemented yet, will be coming soon...!!",
QMessageBox.Cancel,
QMessageBox.Cancel,
)
# TODO implement proper logic for validation. We should also carefully review how these jobs update the table, and how we can cancel pending validations
# in case they are no longer relevant. We might want to 'block' the interactivity on the items for which validation runs with 'connect'!
@SafeSlot()
def _rerun_validation_action(self):
"""Action for the 'rerun_validation' action to rerun validation on selected devices."""
# Implement the logic to rerun validation on selected devices
reply = QMessageBox.question(
self,
"Not implemented yet",
"This feature has not been implemented yet, will be coming soon...!!",
QMessageBox.Cancel,
QMessageBox.Cancel,
)
####### Default view has to be done with setting up splitters ########
def set_default_view(self, horizontal_weights: list, vertical_weights: list):
"""Apply initial weights to every horizontal and vertical splitter.
Examples:
horizontal_weights = [1, 3, 2, 1]
vertical_weights = [3, 7] # top:bottom = 30:70
"""
splitters_h = []
splitters_v = []
for splitter in self.findChildren(QSplitter):
if splitter.orientation() == Qt.Horizontal:
splitters_h.append(splitter)
elif splitter.orientation() == Qt.Vertical:
splitters_v.append(splitter)
def apply_all():
for s in splitters_h:
set_splitter_weights(s, horizontal_weights)
for s in splitters_v:
set_splitter_weights(s, vertical_weights)
QTimer.singleShot(0, apply_all)
def set_stretch(self, *, horizontal=None, vertical=None):
"""Update splitter weights and re-apply to all splitters.
Accepts either a list/tuple of weights (e.g., [1,3,2,1]) or a role dict
for convenience: horizontal roles = {"left","center","right"},
vertical roles = {"top","bottom"}.
"""
def _coerce_h(x):
if x is None:
return None
if isinstance(x, (list, tuple)):
return list(map(float, x))
if isinstance(x, dict):
return [
float(x.get("left", 1)),
float(x.get("center", x.get("middle", 1))),
float(x.get("right", 1)),
]
return None
def _coerce_v(x):
if x is None:
return None
if isinstance(x, (list, tuple)):
return list(map(float, x))
if isinstance(x, dict):
return [float(x.get("top", 1)), float(x.get("bottom", 1))]
return None
h = _coerce_h(horizontal)
v = _coerce_v(vertical)
if h is None:
h = [1, 1, 1]
if v is None:
v = [1, 1]
self.set_default_view(h, v)
def _get_recovery_config_path(self) -> str:
"""Get the recovery config path from the log_writer config."""
# pylint: disable=protected-access
log_writer_config: BECClient = self.client._service_config.config.get("log_writer", {})
writer = DeviceConfigWriter(service_config=log_writer_config)
return os.path.abspath(os.path.expanduser(writer.get_recovery_directory()))
if __name__ == "__main__":
import sys
from copy import deepcopy
from bec_lib.bec_yaml_loader import yaml_load
from qtpy.QtWidgets import QApplication
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
app = QApplication(sys.argv)
w = QWidget()
l = QVBoxLayout()
w.setLayout(l)
apply_theme("dark")
button = DarkModeButton()
l.addWidget(button)
device_manager_view = DeviceManagerView()
l.addWidget(device_manager_view)
# config_path = "/Users/appel_c/work_psi_awi/bec_workspace/csaxs_bec/csaxs_bec/device_configs/first_light.yaml"
# cfg = yaml_load(config_path)
# cfg.update({"device_will_fail": {"name": "device_will_fail", "some_param": 1}})
# # config = device_manager_view.client.device_manager._get_redis_device_config()
# device_manager_view.device_table_view.set_device_config(cfg)
w.show()
w.setWindowTitle("Device Manager View")
w.resize(1920, 1080)
# developer_view.set_stretch(horizontal=[1, 3, 2], vertical=[5, 5]) #can be set during runtime
sys.exit(app.exec_())

View File

@@ -0,0 +1,110 @@
"""Top Level wrapper for device_manager widget"""
from __future__ import annotations
import os
from bec_lib.bec_yaml_loader import yaml_load
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from qtpy import QtCore, QtWidgets
from bec_widgets.examples.device_manager_view.device_manager_view import DeviceManagerView
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
logger = bec_logger.logger
class DeviceManagerWidget(BECWidget, QtWidgets.QWidget):
def __init__(self, parent=None, client=None):
super().__init__(client=client, parent=parent)
self.stacked_layout = QtWidgets.QStackedLayout()
self.stacked_layout.setContentsMargins(0, 0, 0, 0)
self.stacked_layout.setSpacing(0)
self.stacked_layout.setStackingMode(QtWidgets.QStackedLayout.StackAll)
self.setLayout(self.stacked_layout)
# Add device manager view
self.device_manager_view = DeviceManagerView()
self.stacked_layout.addWidget(self.device_manager_view)
# Add overlay widget
self._overlay_widget = QtWidgets.QWidget(self)
self._customize_overlay()
self.stacked_layout.addWidget(self._overlay_widget)
self.stacked_layout.setCurrentWidget(self._overlay_widget)
def _customize_overlay(self):
self._overlay_widget.setStyleSheet(
"background: qlineargradient(x1:0, y1:0, x2:0, y2:1,stop:0 #ffffff, stop:1 #e0e0e0);"
)
self._overlay_widget.setAutoFillBackground(True)
self._overlay_layout = QtWidgets.QVBoxLayout()
self._overlay_layout.setAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self._overlay_widget.setLayout(self._overlay_layout)
self._overlay_widget.setSizePolicy(
QtWidgets.QSizePolicy.Policy.Expanding, QtWidgets.QSizePolicy.Policy.Expanding
)
# Load current config
self.button_load_current_config = QtWidgets.QPushButton("Load Current Config")
icon = material_icon(icon_name="database", size=(24, 24), convert_to_pixmap=False)
self.button_load_current_config.setIcon(icon)
self._overlay_layout.addWidget(self.button_load_current_config)
self.button_load_current_config.clicked.connect(self._load_config_clicked)
# Load config from disk
self.button_load_config_from_file = QtWidgets.QPushButton("Load Config From File")
icon = material_icon(icon_name="folder", size=(24, 24), convert_to_pixmap=False)
self.button_load_config_from_file.setIcon(icon)
self._overlay_layout.addWidget(self.button_load_config_from_file)
self.button_load_config_from_file.clicked.connect(self._load_config_from_file_clicked)
self._overlay_widget.setVisible(True)
def _load_config_from_file_clicked(self):
"""Handle click on 'Load Config From File' button."""
start_dir = os.path.expanduser("~")
file_path, _ = QtWidgets.QFileDialog.getOpenFileName(
self, caption="Select Config File", dir=start_dir
)
if file_path:
self._load_config_from_file(file_path)
def _load_config_from_file(self, file_path: str):
try:
config = yaml_load(file_path)
except Exception as e:
logger.error(f"Failed to load config from file {file_path}. Error: {e}")
return
config_list = []
for name, cfg in config.items():
config_list.append(cfg)
config_list[-1]["name"] = name
self.device_manager_view.device_table_view.set_device_config(config_list)
# self.device_manager_view.ophyd_test.on_device_config_update(config)
self.stacked_layout.setCurrentWidget(self.device_manager_view)
@SafeSlot()
def _load_config_clicked(self):
"""Handle click on 'Load Current Config' button."""
config = self.client.device_manager._get_redis_device_config()
config.append({"name": "wrong_device", "some_value": 1})
self.device_manager_view.device_table_view.set_device_config(config)
# self.device_manager_view.ophyd_test.on_device_config_update(config)
self.stacked_layout.setCurrentWidget(self.device_manager_view)
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
device_manager = DeviceManagerWidget()
# config = device_manager.client.device_manager._get_redis_device_config()
# device_manager.device_table_view.set_device_config(config)
device_manager.show()
device_manager.setWindowTitle("Device Manager View")
device_manager.resize(1600, 1200)
# developer_view.set_stretch(horizontal=[1, 3, 2], vertical=[5, 5]) #can be set during runtime
sys.exit(app.exec_())

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,4 @@
from .device_table_view import DeviceTableView
from .dm_config_view import DMConfigView
from .dm_docstring_view import DocstringView
from .dm_ophyd_test import DMOphydTest

View File

@@ -3,16 +3,18 @@
from __future__ import annotations
import copy
import json
import time
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from qtpy import QtCore, QtGui, QtWidgets
from thefuzz import fuzz
from bec_widgets.utils.bec_signal_proxy import BECSignalProxy
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.colors import get_accent_colors, get_theme_palette
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.control.device_manager.components.dm_ophyd_test import ValidationStatus
logger = bec_logger.logger
@@ -23,34 +25,32 @@ FUZZY_SEARCH_THRESHOLD = 80
class DictToolTipDelegate(QtWidgets.QStyledItemDelegate):
"""Delegate that shows all key-value pairs of a rows's data as a YAML-like tooltip."""
@staticmethod
def dict_to_str(d: dict) -> str:
"""Convert a dictionary to a formatted string."""
return json.dumps(d, indent=4)
def helpEvent(self, event, view, option, index):
"""Override to show tooltip when hovering."""
if event.type() != QtCore.QEvent.ToolTip:
return super().helpEvent(event, view, option, index)
model: DeviceFilterProxyModel = index.model()
model_index = model.mapToSource(index)
row_dict = model.sourceModel().row_data(model_index)
row_dict.pop("description", None)
QtWidgets.QToolTip.showText(event.globalPos(), self.dict_to_str(row_dict), view)
row_dict = model.sourceModel().get_row_data(model_index)
description = row_dict.get("description", "")
QtWidgets.QToolTip.showText(event.globalPos(), description, view)
return True
class CenterCheckBoxDelegate(DictToolTipDelegate):
"""Custom checkbox delegate to center checkboxes in table cells."""
def __init__(self, parent=None):
def __init__(self, parent=None, colors=None):
super().__init__(parent)
colors = get_accent_colors()
self._colors = colors if colors else get_accent_colors()
self._icon_checked = material_icon(
"check_box", size=QtCore.QSize(16, 16), color=colors.default
"check_box", size=QtCore.QSize(16, 16), color=self._colors.default, filled=True
)
self._icon_unchecked = material_icon(
"check_box_outline_blank", size=QtCore.QSize(16, 16), color=colors.default
"check_box_outline_blank",
size=QtCore.QSize(16, 16),
color=self._colors.default,
filled=True,
)
def apply_theme(self, theme: str | None = None):
@@ -81,9 +81,51 @@ class CenterCheckBoxDelegate(DictToolTipDelegate):
return model.setData(index, new_state, QtCore.Qt.CheckStateRole)
class DeviceValidatedDelegate(DictToolTipDelegate):
"""Custom delegate for displaying validated device configurations."""
def __init__(self, parent=None, colors=None):
super().__init__(parent)
self._colors = colors if colors else get_accent_colors()
self._icons = {
ValidationStatus.PENDING: material_icon(
icon_name="circle", size=(12, 12), color=self._colors.default, filled=True
),
ValidationStatus.VALID: material_icon(
icon_name="circle", size=(12, 12), color=self._colors.success, filled=True
),
ValidationStatus.FAILED: material_icon(
icon_name="circle", size=(12, 12), color=self._colors.emergency, filled=True
),
}
def apply_theme(self, theme: str | None = None):
colors = get_accent_colors()
for status, icon in self._icons.items():
icon.setColor(colors[status])
def paint(self, painter, option, index):
status = index.model().data(index, QtCore.Qt.DisplayRole)
if status is None:
return super().paint(painter, option, index)
pixmap = self._icons.get(status)
if pixmap:
rect = option.rect
pix_rect = pixmap.rect()
pix_rect.moveCenter(rect.center())
painter.drawPixmap(pix_rect.topLeft(), pixmap)
super().paint(painter, option, index)
class WrappingTextDelegate(DictToolTipDelegate):
"""Custom delegate for wrapping text in table cells."""
def __init__(self, table: BECTableView, parent=None):
super().__init__(parent)
self._table = table
def paint(self, painter, option, index):
text = index.model().data(index, QtCore.Qt.DisplayRole)
if not text:
@@ -97,12 +139,14 @@ class WrappingTextDelegate(DictToolTipDelegate):
def sizeHint(self, option, index):
text = str(index.model().data(index, QtCore.Qt.DisplayRole) or "")
# if not text:
# return super().sizeHint(option, index)
column_width = self._table.columnWidth(index.column()) - 8 # -4 & 4
# Use the actual column width
table = index.model().parent() # or store reference to QTableView
column_width = table.columnWidth(index.column()) # - 8
# Avoid pathological heights for too-narrow columns
min_width = option.fontMetrics.averageCharWidth() * 4
if column_width < min_width:
fm = QtGui.QFontMetrics(option.font)
elided = fm.elidedText(text, QtCore.Qt.ElideRight, column_width)
return QtCore.QSize(column_width, fm.height() + 4)
doc = QtGui.QTextDocument()
doc.setDefaultFont(option.font)
@@ -110,8 +154,25 @@ class WrappingTextDelegate(DictToolTipDelegate):
doc.setPlainText(text)
layout_height = doc.documentLayout().documentSize().height()
height = int(layout_height) + 4 # Needs some extra padding, otherwise it gets cut off
return QtCore.QSize(column_width, height)
return QtCore.QSize(column_width, int(layout_height) + 4)
# def sizeHint(self, option, index):
# text = str(index.model().data(index, QtCore.Qt.DisplayRole) or "")
# # if not text:
# # return super().sizeHint(option, index)
# # Use the actual column width
# table = index.model().parent() # or store reference to QTableView
# column_width = table.columnWidth(index.column()) # - 8
# doc = QtGui.QTextDocument()
# doc.setDefaultFont(option.font)
# doc.setTextWidth(column_width)
# doc.setPlainText(text)
# layout_height = doc.documentLayout().documentSize().height()
# height = int(layout_height) + 4 # Needs some extra padding, otherwise it gets cut off
# return QtCore.QSize(column_width, height)
class DeviceTableModel(QtCore.QAbstractTableModel):
@@ -121,17 +182,22 @@ class DeviceTableModel(QtCore.QAbstractTableModel):
Sort logic is implemented directly on the data of the table view.
"""
def __init__(self, device_config: list[dict] | None = None, parent=None):
device_configs_added = QtCore.Signal(dict) # Dict[str, dict] of configs that were added
devices_removed = QtCore.Signal(list) # List of strings with device names that were removed
def __init__(self, parent=None):
super().__init__(parent)
self._device_config = device_config or []
self._device_config: dict[str, dict] = {}
self._list_items: list[dict] = []
self._validation_status: dict[str, ValidationStatus] = {}
self.headers = [
"",
"name",
"deviceClass",
"readoutPriority",
"deviceTags",
"enabled",
"readOnly",
"deviceTags",
"description",
]
self._checkable_columns_enabled = {"enabled": True, "readOnly": True}
@@ -140,7 +206,7 @@ class DeviceTableModel(QtCore.QAbstractTableModel):
###############################################
def rowCount(self, parent=QtCore.QModelIndex()) -> int:
return len(self._device_config)
return len(self._list_items)
def columnCount(self, parent=QtCore.QModelIndex()) -> int:
return len(self.headers)
@@ -150,25 +216,32 @@ class DeviceTableModel(QtCore.QAbstractTableModel):
return self.headers[section]
return None
def row_data(self, index: QtCore.QModelIndex) -> dict:
def get_row_data(self, index: QtCore.QModelIndex) -> dict:
"""Return the row data for the given index."""
if not index.isValid():
return {}
return copy.deepcopy(self._device_config[index.row()])
return copy.deepcopy(self._list_items[index.row()])
def data(self, index, role=QtCore.Qt.DisplayRole):
"""Return data for the given index and role."""
if not index.isValid():
return None
row, col = index.row(), index.column()
if col == 0 and role == QtCore.Qt.DisplayRole: # QtCore.Qt.DisplayRole:
dev_name = self._list_items[row].get("name", "")
return self._validation_status.get(dev_name, ValidationStatus.PENDING)
key = self.headers[col]
value = self._device_config[row].get(key)
value = self._list_items[row].get(key)
if role == QtCore.Qt.DisplayRole:
if key in ("enabled", "readOnly"):
return bool(value)
if key == "deviceTags":
return ", ".join(str(tag) for tag in value) if value else ""
if key == "deviceClass":
return str(value).split(".")[-1]
return str(value) if value is not None else ""
if role == QtCore.Qt.CheckStateRole and key in ("enabled", "readOnly"):
return QtCore.Qt.Checked if value else QtCore.Qt.Unchecked
@@ -215,7 +288,7 @@ class DeviceTableModel(QtCore.QAbstractTableModel):
if key in ("enabled", "readOnly") and role == QtCore.Qt.CheckStateRole:
if not self._checkable_columns_enabled.get(key, True):
return False # ignore changes if column is disabled
self._device_config[row][key] = value == QtCore.Qt.Checked
self._list_items[row][key] = value == QtCore.Qt.Checked
self.dataChanged.emit(index, index, [QtCore.Qt.CheckStateRole])
return True
return False
@@ -224,87 +297,115 @@ class DeviceTableModel(QtCore.QAbstractTableModel):
############ Public methods ########
####################################
def get_device_config(self) -> list[dict]:
"""Return the current device config (with checkbox updates applied)."""
def get_device_config(self) -> dict[str, dict]:
"""Method to get the device configuration."""
return self._device_config
def set_checkbox_enabled(self, column_name: str, enabled: bool):
def add_device_configs(self, device_configs: dict[str, dict]):
"""
Enable/Disable the checkbox column.
Add devices to the model.
Args:
column_name (str): The name of the column to modify.
enabled (bool): Whether the checkbox should be enabled or disabled.
device_configs (dict[str, dict]): A dictionary of device configurations to add.
"""
if column_name in self._checkable_columns_enabled:
self._checkable_columns_enabled[column_name] = enabled
col = self.headers.index(column_name)
top_left = self.index(0, col)
bottom_right = self.index(self.rowCount() - 1, col)
self.dataChanged.emit(
top_left, bottom_right, [QtCore.Qt.CheckStateRole, QtCore.Qt.DisplayRole]
)
already_in_list = []
for k, cfg in device_configs.items():
if k in self._device_config:
logger.warning(f"Device {k} already exists in the model.")
already_in_list.append(k)
continue
self._device_config[k] = cfg
new_list_cfg = copy.deepcopy(cfg)
new_list_cfg["name"] = k
row = len(self._list_items)
self.beginInsertRows(QtCore.QModelIndex(), row, row)
self._list_items.append(new_list_cfg)
self.endInsertRows()
for k in already_in_list:
device_configs.pop(k)
self.device_configs_added.emit(device_configs)
def set_device_config(self, device_config: list[dict]):
def set_device_config(self, device_configs: dict[str, dict]):
"""
Replace the device config.
Args:
device_config (list[dict]): The new device config to set.
device_config (dict[str, dict]): The new device config to set.
"""
diff_names = set(device_configs.keys()) - set(self._device_config.keys())
self.beginResetModel()
self._device_config = list(device_config)
self._device_config.clear()
self._list_items.clear()
for k, cfg in device_configs.items():
self._device_config[k] = cfg
new_list_cfg = copy.deepcopy(cfg)
new_list_cfg["name"] = k
self._list_items.append(new_list_cfg)
self.endResetModel()
self.devices_removed.emit(diff_names)
self.device_configs_added.emit(device_configs)
@SafeSlot(dict)
def add_device(self, device: dict):
def remove_device_configs(self, device_configs: dict[str, dict]):
"""
Add an extra device to the device config at the bottom.
Remove devices from the model.
Args:
device (dict): The device configuration to add.
device_configs (dict[str, dict]): A dictionary of device configurations to remove.
"""
row = len(self._device_config)
self.beginInsertRows(QtCore.QModelIndex(), row, row)
self._device_config.append(device)
self.endInsertRows()
@SafeSlot(int)
def remove_device_by_row(self, row: int):
"""
Remove one device row by index. This maps to the row to the source of the data model
Args:
row (int): The index of the device row to remove.
"""
if 0 <= row < len(self._device_config):
removed = []
for k in device_configs.keys():
if k not in self._device_config:
logger.warning(f"Device {k} does not exist in the model.")
continue
new_cfg = self._device_config.pop(k)
new_cfg["name"] = k
row = self._list_items.index(new_cfg)
self.beginRemoveRows(QtCore.QModelIndex(), row, row)
self._device_config.pop(row)
self._list_items.pop(row)
self.endRemoveRows()
removed.append(k)
self.devices_removed.emit(removed)
@SafeSlot(list)
def remove_devices_by_rows(self, rows: list[int]):
def clear_table(self):
"""
Remove multiple device rows by their indices.
Clear the table.
"""
device_names = list(self._device_config.keys())
self.beginResetModel()
self._device_config.clear()
self._list_items.clear()
self.endResetModel()
self.devices_removed.emit(device_names)
def update_validation_status(self, device_name: str, status: int | ValidationStatus):
"""
Handle device status changes.
Args:
rows (list[int]): The indices of the device rows to remove.
device_name (str): The name of the device.
status (int): The new status of the device.
"""
for row in sorted(rows, reverse=True):
self.remove_device_by_row(row)
@SafeSlot(str)
def remove_device_by_name(self, name: str):
"""
Remove one device row by name.
Args:
name (str): The name of the device to remove.
"""
for row, device in enumerate(self._device_config):
if device.get("name") == name:
self.remove_device_by_row(row)
if isinstance(status, int):
status = ValidationStatus(status)
if device_name not in self._device_config:
logger.warning(
f"Device {device_name} not found in device_config dict {self._device_config}"
)
return
self._validation_status[device_name] = status
row = None
for ii, item in enumerate(self._list_items):
if item["name"] == device_name:
row = ii
break
if row is None:
logger.warning(
f"Device {device_name} not found in device_status dict {self._validation_status}"
)
return
# Emit dataChanged for column 0 (status column)
index = self.index(row, 0)
self.dataChanged.emit(index, index, [QtCore.Qt.DisplayRole])
class BECTableView(QtWidgets.QTableView):
@@ -324,12 +425,7 @@ class BECTableView(QtWidgets.QTableView):
if not proxy_indexes:
return
# Get unique rows (proxy indices) in reverse order so removal indexes stay valid
proxy_rows = sorted({idx.row() for idx in proxy_indexes}, reverse=True)
# Map to source model rows
source_rows = [
self.model().mapToSource(self.model().index(row, 0)).row() for row in proxy_rows
]
source_rows = self._get_source_rows(proxy_indexes)
model: DeviceTableModel = self.model().sourceModel() # access underlying model
# Delegate confirmation and removal to helper
@@ -337,14 +433,28 @@ class BECTableView(QtWidgets.QTableView):
if not removed:
return
def _get_source_rows(self, proxy_indexes: list[QtWidgets.QModelIndex]) -> list[int]:
"""
Map proxy model indices to source model row indices.
Args:
proxy_indexes (list[QModelIndex]): List of proxy model indices.
Returns:
list[int]: List of source model row indices.
"""
proxy_rows = sorted({idx for idx in proxy_indexes}, reverse=True)
source_rows = [self.model().mapToSource(idx).row() for idx in proxy_rows]
return list(set(source_rows))
def _confirm_and_remove_rows(self, model: DeviceTableModel, source_rows: list[int]) -> bool:
"""
Prompt the user to confirm removal of rows and remove them from the model if accepted.
Returns True if rows were removed, False otherwise.
"""
cfg = model.get_device_config()
names = [str(cfg[r].get("name", "<unknown>")) for r in sorted(source_rows)]
configs = [model._list_items[r] for r in sorted(source_rows)]
names = [cfg.get("name", "<unknown>") for cfg in configs]
msg = QtWidgets.QMessageBox(self)
msg.setIcon(QtWidgets.QMessageBox.Warning)
@@ -359,8 +469,8 @@ class BECTableView(QtWidgets.QTableView):
res = msg.exec_()
if res == QtWidgets.QMessageBox.Ok:
model.remove_devices_by_rows(source_rows)
# TODO add signal for removed devices
configs_to_be_removed = {model._device_config[name] for name in names}
model.remove_device_configs(configs_to_be_removed)
return True
return False
@@ -372,7 +482,7 @@ class DeviceFilterProxyModel(QtCore.QSortFilterProxyModel):
self._hidden_rows = set()
self._filter_text = ""
self._enable_fuzzy = True
self._filter_columns = [0, 1] # name and deviceClass for search
self._filter_columns = [1, 2] # name and deviceClass for search
def hide_rows(self, row_indices: list[int]):
"""
@@ -436,9 +546,12 @@ class DeviceFilterProxyModel(QtCore.QSortFilterProxyModel):
class DeviceTableView(BECWidget, QtWidgets.QWidget):
"""Device Table View for the device manager."""
selected_device = QtCore.Signal(dict) # Selected device configuration dict[str,dict]
device_configs_added = QtCore.Signal(dict) # Dict[str, dict] of configs that were added
devices_removed = QtCore.Signal(list) # List of strings with device names that were removed
RPC = False
PLUGIN = False
devices_removed = QtCore.Signal(list)
def __init__(self, parent=None, client=None):
super().__init__(client=client, parent=parent, theme_update=True)
@@ -455,6 +568,10 @@ class DeviceTableView(BECWidget, QtWidgets.QWidget):
self.layout.addLayout(self.search_controls)
self.layout.addWidget(self.table)
# Connect signals
self._model.devices_removed.connect(self.devices_removed.emit)
self._model.device_configs_added.connect(self.device_configs_added.emit)
def _setup_search(self):
"""Create components related to the search functionality"""
@@ -495,137 +612,199 @@ class DeviceTableView(BECWidget, QtWidgets.QWidget):
"""Setup the table view."""
# Model + Proxy
self.table = BECTableView(self)
self.model = DeviceTableModel(parent=self.table)
self._model = DeviceTableModel(parent=self.table)
self.proxy = DeviceFilterProxyModel(parent=self.table)
self.proxy.setSourceModel(self.model)
self.proxy.setSourceModel(self._model)
self.table.setModel(self.proxy)
self.table.setSortingEnabled(True)
# Delegates
self.checkbox_delegate = CenterCheckBoxDelegate(self.table)
colors = get_accent_colors()
self.checkbox_delegate = CenterCheckBoxDelegate(self.table, colors=colors)
self.wrap_delegate = WrappingTextDelegate(self.table)
self.tool_tip_delegate = DictToolTipDelegate(self.table)
self.table.setItemDelegateForColumn(0, self.tool_tip_delegate) # name
self.table.setItemDelegateForColumn(1, self.tool_tip_delegate) # deviceClass
self.table.setItemDelegateForColumn(2, self.tool_tip_delegate) # readoutPriority
self.table.setItemDelegateForColumn(3, self.checkbox_delegate) # enabled
self.table.setItemDelegateForColumn(4, self.checkbox_delegate) # readOnly
self.table.setItemDelegateForColumn(5, self.wrap_delegate) # deviceTags
self.table.setItemDelegateForColumn(6, self.wrap_delegate) # description
self.validated_delegate = DeviceValidatedDelegate(self.table, colors=colors)
self.table.setItemDelegateForColumn(0, self.validated_delegate) # ValidationStatus
self.table.setItemDelegateForColumn(1, self.tool_tip_delegate) # name
self.table.setItemDelegateForColumn(2, self.tool_tip_delegate) # deviceClass
self.table.setItemDelegateForColumn(3, self.tool_tip_delegate) # readoutPriority
self.table.setItemDelegateForColumn(4, self.wrap_delegate) # deviceTags
self.table.setItemDelegateForColumn(5, self.checkbox_delegate) # enabled
self.table.setItemDelegateForColumn(6, self.checkbox_delegate) # readOnly
# Column resize policies
# TODO maybe we need here a flexible header options as deviceClass
# may get quite long for beamlines plugin repos
header = self.table.horizontalHeader()
header.setSectionResizeMode(0, QtWidgets.QHeaderView.ResizeToContents) # name
header.setSectionResizeMode(1, QtWidgets.QHeaderView.ResizeToContents) # deviceClass
header.setSectionResizeMode(2, QtWidgets.QHeaderView.ResizeToContents) # readoutPriority
header.setSectionResizeMode(3, QtWidgets.QHeaderView.Fixed) # enabled
header.setSectionResizeMode(4, QtWidgets.QHeaderView.Fixed) # readOnly
# TODO maybe better stretch...
header.setSectionResizeMode(5, QtWidgets.QHeaderView.ResizeToContents) # deviceTags
header.setSectionResizeMode(6, QtWidgets.QHeaderView.Stretch) # description
self.table.setColumnWidth(3, 82)
self.table.setColumnWidth(4, 82)
header.setSectionResizeMode(0, QtWidgets.QHeaderView.Fixed) # ValidationStatus
header.setSectionResizeMode(1, QtWidgets.QHeaderView.ResizeToContents) # name
header.setSectionResizeMode(2, QtWidgets.QHeaderView.ResizeToContents) # deviceClass
header.setSectionResizeMode(3, QtWidgets.QHeaderView.ResizeToContents) # readoutPriority
header.setSectionResizeMode(4, QtWidgets.QHeaderView.Stretch) # deviceTags
header.setSectionResizeMode(5, QtWidgets.QHeaderView.Fixed) # enabled
header.setSectionResizeMode(6, QtWidgets.QHeaderView.Fixed) # readOnly
self.table.setColumnWidth(0, 25)
self.table.setColumnWidth(5, 70)
self.table.setColumnWidth(6, 70)
# Ensure column widths stay fixed
header.setMinimumSectionSize(70)
header.setMinimumSectionSize(25)
header.setDefaultSectionSize(90)
# Enable resizing of column
header.sectionResized.connect(self.on_table_resized)
self._geometry_resize_proxy = BECSignalProxy(
header.geometriesChanged, rateLimit=10, slot=self._on_table_resized
)
# Selection behavior
self.table.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectRows)
self.table.setSelectionMode(QtWidgets.QAbstractItemView.ExtendedSelection)
# Connect to selection model to get selection changes
self.table.selectionModel().selectionChanged.connect(self._on_selection_changed)
self.table.horizontalHeader().setHighlightSections(False)
# QtCore.QTimer.singleShot(0, lambda: header.sectionResized.emit(0, 0, 0))
def device_config(self) -> list[dict]:
def get_device_config(self) -> dict[str, dict]:
"""Get the device config."""
return self.model.get_device_config()
return self._model.get_device_config()
def apply_theme(self, theme: str | None = None):
self.checkbox_delegate.apply_theme(theme)
self.validated_delegate.apply_theme(theme)
######################################
########### Slot API #################
######################################
@SafeSlot(int, int, int)
def on_table_resized(self, column, old_width, new_width):
@SafeSlot()
def _on_table_resized(self, *args):
"""Handle changes to the table column resizing."""
if column != len(self.model.headers) - 1:
option = QtWidgets.QStyleOptionViewItem()
model = self.table.model()
for row in range(model.rowCount()):
index = model.index(row, 4)
height = self.wrap_delegate.sizeHint(option, index).height()
self.table.setRowHeight(row, height)
@SafeSlot(QtCore.QItemSelection, QtCore.QItemSelection)
def _on_selection_changed(
self, selected: QtCore.QItemSelection, deselected: QtCore.QItemSelection
) -> None:
"""
Handle selection changes in the device table.
Args:
selected (QtCore.QItemSelection): The selected items.
deselected (QtCore.QItemSelection): The deselected items.
"""
# TODO also hook up logic if a config update is propagated from somewhere!
# selected_indexes = selected.indexes()
selected_indexes = self.table.selectionModel().selectedIndexes()
if not selected_indexes:
return
for row in range(self.table.model().rowCount()):
index = self.table.model().index(row, column)
delegate = self.table.itemDelegate(index)
option = QtWidgets.QStyleOptionViewItem()
height = delegate.sizeHint(option, index).height()
self.table.setRowHeight(row, height)
source_indexes = [self.proxy.mapToSource(idx) for idx in selected_indexes]
source_rows = {idx.row() for idx in source_indexes}
configs = [copy.deepcopy(self._model._list_items[r]) for r in sorted(source_rows)]
names = [cfg.pop("name") for cfg in configs]
selected_cfgs = {name: cfg for name, cfg in zip(names, configs)}
self.selected_device.emit(selected_cfgs)
######################################
##### Ext. Slot API #################
######################################
@SafeSlot(list)
def set_device_config(self, config: list[dict]):
@SafeSlot(dict)
def set_device_config(self, device_configs: dict[str, dict]):
"""
Set the device config.
Args:
config (list[dict]): The device config to set.
config (dict[str,dict]): The device config to set.
"""
self.model.set_device_config(config)
self._model.set_device_config(device_configs)
@SafeSlot()
def clear_device_config(self):
"""
Clear the device config.
"""
self.model.set_device_config([])
def clear_device_configs(self):
"""Clear the device configs."""
self._model.clear_table()
@SafeSlot(dict)
def add_device(self, device: dict):
def add_device_configs(self, device_configs: dict[str, dict]):
"""
Add a device to the config.
Add devices to the config.
Args:
device (dict): The device to add.
device_configs (dict[str, dict]): The device configs to add.
"""
self.model.add_device(device)
self._model.add_device_configs(device_configs)
@SafeSlot(dict)
def remove_device_configs(self, device_configs: dict[str, dict]):
"""
Remove devices from the config.
Args:
device_configs (dict[str, dict]): The device configs to remove.
"""
self._model.remove_device_configs(device_configs)
@SafeSlot(int)
@SafeSlot(str)
def remove_device(self, dev: int | str):
def remove_device(self, device_name: str):
"""
Remove the device from the config either by row id, or device name.
Remove a device from the config.
Args:
dev (int | str): The device to remove, either by row id or device name.
device_name (str): The name of the device to remove.
"""
if isinstance(dev, int):
# TODO test this properly, check with proxy index and source index
# Use the proxy model to map to the correct row
model_source_index = self.table.model().mapToSource(self.table.model().index(dev, 0))
self.model.remove_device_by_row(model_source_index.row())
return
if isinstance(dev, str):
self.model.remove_device_by_name(dev)
cfg = self._model._device_config.get(device_name, None)
if cfg is None:
logger.warning(f"Device {device_name} not found in device_config dict")
return
self._model.remove_device_configs({device_name: cfg})
@SafeSlot(str, int)
def update_device_validation(
self, device_name: str, validation_status: int | ValidationStatus
) -> None:
"""
Update the validation status of a device.
Args:
device_name (str): The name of the device.
validation_status (int | ValidationStatus): The new validation status.
"""
self._model.update_validation_status(device_name, validation_status)
if __name__ == "__main__":
import sys
import numpy as np
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
widget = QtWidgets.QWidget()
layout = QtWidgets.QVBoxLayout(widget)
layout.setContentsMargins(0, 0, 0, 0)
window = DeviceTableView()
layout.addWidget(window)
# QPushButton
button = QtWidgets.QPushButton("Test status_update")
layout.addWidget(button)
def _button_clicked():
names = list(window._model._device_config.keys())
for name in names:
window.update_device_validation(
name, ValidationStatus.VALID if np.random.rand() > 0.5 else ValidationStatus.FAILED
)
button.clicked.connect(_button_clicked)
# pylint: disable=protected-access
config = window.client.device_manager._get_redis_device_config()
window.set_device_config(config)
window.show()
names = [cfg.pop("name") for cfg in config]
config_dict = {name: cfg for name, cfg in zip(names, config)}
window.set_device_config(config_dict)
widget.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1,83 @@
"""Module with a config view for the device manager."""
from __future__ import annotations
import traceback
import yaml
from bec_lib.logger import bec_logger
from qtpy import QtCore, QtWidgets
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors, get_theme_palette
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
logger = bec_logger.logger
class DMConfigView(BECWidget, QtWidgets.QWidget):
def __init__(self, parent=None, client=None):
super().__init__(client=client, parent=parent, theme_update=True)
self.stacked_layout = QtWidgets.QStackedLayout()
self.stacked_layout.setContentsMargins(0, 0, 0, 0)
self.stacked_layout.setSpacing(0)
self.setLayout(self.stacked_layout)
# Monaco widget
self.monaco_editor = MonacoWidget()
self._customize_monaco()
self.stacked_layout.addWidget(self.monaco_editor)
self._overlay_widget = QtWidgets.QLabel(text="Select single device to show config")
self._customize_overlay()
self.stacked_layout.addWidget(self._overlay_widget)
self.stacked_layout.setCurrentWidget(self._overlay_widget)
def _customize_monaco(self):
self.monaco_editor.set_language("yaml")
self.monaco_editor.set_vim_mode_enabled(False)
self.monaco_editor.set_minimap_enabled(False)
# self.monaco_editor.setFixedHeight(600)
self.monaco_editor.set_readonly(True)
self.monaco_editor.editor.set_scroll_beyond_last_line_enabled(False)
self.monaco_editor.editor.set_line_numbers_mode("off")
def _customize_overlay(self):
self._overlay_widget.setAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self._overlay_widget.setAutoFillBackground(True)
self._overlay_widget.setSizePolicy(
QtWidgets.QSizePolicy.Policy.Expanding, QtWidgets.QSizePolicy.Policy.Expanding
)
@SafeSlot(dict)
def on_select_config(self, device: dict):
"""Handle selection of a device from the device table."""
if len(device) != 1:
text = ""
self.stacked_layout.setCurrentWidget(self._overlay_widget)
else:
try:
text = yaml.dump(device, default_flow_style=False)
self.stacked_layout.setCurrentWidget(self.monaco_editor)
except Exception:
content = traceback.format_exc()
logger.error(f"Error converting device to YAML:\n{content}")
text = ""
self.stacked_layout.setCurrentWidget(self._overlay_widget)
self.monaco_editor.set_readonly(False) # Enable editing
text = text.rstrip()
self.monaco_editor.set_text(text)
self.monaco_editor.set_readonly(True) # Disable editing again
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
config_view = DMConfigView()
config_view.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1,128 @@
"""Module to visualize the docstring of a device class."""
from __future__ import annotations
import inspect
import re
import traceback
from bec_lib.logger import bec_logger
from bec_lib.plugin_helper import get_plugin_class, plugin_package_name
from bec_lib.utils.rpc_utils import rgetattr
from qtpy import QtCore, QtWidgets
from bec_widgets.utils.error_popups import SafeSlot
logger = bec_logger.logger
try:
import ophyd
import ophyd_devices
READY_TO_VIEW = True
except ImportError:
logger.warning(f"Optional dependencies not available: {ImportError}")
ophyd_devices = None
ophyd = None
class DocstringView(QtWidgets.QTextEdit):
def __init__(self, parent: QtWidgets.QWidget | None = None):
super().__init__(parent)
self.setReadOnly(True)
self.setFocusPolicy(QtCore.Qt.NoFocus)
if not READY_TO_VIEW:
self._set_text("Ophyd or ophyd_devices not installed, cannot show docstrings.")
self.setEnabled(False)
return
def _format_docstring(self, doc: str | None) -> str:
if not doc:
return "<i>No docstring available.</i>"
# Escape HTML
doc = doc.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
# Remove leading/trailing blank lines from the entire docstring
lines = [line.rstrip() for line in doc.splitlines()]
while lines and lines[0].strip() == "":
lines.pop(0)
while lines and lines[-1].strip() == "":
lines.pop()
doc = "\n".join(lines)
# Improved regex: match section header + all following indented lines
section_regex = re.compile(
r"(?m)^(Parameters|Args|Returns|Examples|Attributes|Raises)\b(?:\n([ \t]+.*))*",
re.MULTILINE,
)
def strip_section(match: re.Match) -> str:
# Capture all lines in the match
block = match.group(0)
lines = block.splitlines()
# Remove leading/trailing empty lines within the section
lines = [line for line in lines if line.strip() != ""]
return "\n".join(lines)
doc = section_regex.sub(strip_section, doc)
# Highlight section titles
doc = re.sub(
r"(?m)^(Parameters|Args|Returns|Examples|Attributes|Raises)\b", r"<b>\1</b>", doc
)
# Convert indented blocks to <pre> and strip leading/trailing newlines
def pre_block(match: re.Match) -> str:
text = match.group(0).strip("\n")
return f"<pre>{text}</pre>"
doc = re.sub(r"(?m)(?:\n[ \t]+.*)+", pre_block, doc)
# Replace remaining newlines with <br> and collapse multiple <br>
doc = doc.replace("\n", "<br>")
doc = re.sub(r"(<br>)+", r"<br>", doc)
doc = doc.strip("<br>")
return f"<div style='font-family: sans-serif; font-size: 12pt;'>{doc}</div>"
def _set_text(self, text: str):
self.setReadOnly(False)
self.setMarkdown(text)
# self.setHtml(self._format_docstring(text))
self.setReadOnly(True)
@SafeSlot(dict)
def on_select_config(self, device: dict):
if len(device) != 1:
self._set_text("")
return
k = next(iter(device))
device_class = device[k].get("deviceClass", "")
self.set_device_class(device_class)
@SafeSlot(str)
def set_device_class(self, device_class_str: str) -> None:
docstring = ""
if not READY_TO_VIEW:
return
try:
module_cls = get_plugin_class(device_class_str, [ophyd_devices, ophyd])
docstring = inspect.getdoc(module_cls)
self._set_text(docstring or "No docstring available.")
except Exception:
content = traceback.format_exc()
logger.error(f"Error retrieving docstring for {device_class_str}: {content}")
self._set_text(f"Error retrieving docstring for {device_class_str}")
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
config_view = DocstringView()
config_view.set_device_class("ophyd_devices.sim.sim_camera.SimCamera")
config_view.show()
sys.exit(app.exec_())

View File

@@ -0,0 +1,414 @@
"""Module to run a static tests for devices from a yaml config."""
from __future__ import annotations
import enum
import re
import traceback
from html import escape
from typing import TYPE_CHECKING
import bec_lib
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from ophyd import status
from qtpy import QtCore, QtGui, QtWidgets
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.widgets.editors.web_console.web_console import WebConsole
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
READY_TO_TEST = False
logger = bec_logger.logger
try:
import bec_server
import ophyd_devices
READY_TO_TEST = True
except ImportError:
logger.warning(f"Optional dependencies not available: {ImportError}")
ophyd_devices = None
bec_server = None
if TYPE_CHECKING: # pragma no cover
try:
from ophyd_devices.utils.static_device_test import StaticDeviceTest
except ImportError:
StaticDeviceTest = None
class ValidationStatus(int, enum.Enum):
"""Validation status for device configurations."""
PENDING = 0 # colors.default
VALID = 1 # colors.highlight
FAILED = 2 # colors.emergency
class DeviceValidationResult(QtCore.QObject):
"""Simple object to inject validation signals into QRunnable."""
# Device validation signal, device_name, ValidationStatus as int, error message or ''
device_validated = QtCore.Signal(str, bool, str)
class DeviceValidationRunnable(QtCore.QRunnable):
"""Runnable for validating a device configuration."""
def __init__(
self,
device_name: str,
config: dict,
static_device_test: StaticDeviceTest | None,
connect: bool = False,
):
"""
Initialize the device validation runnable.
Args:
device_name (str): The name of the device to validate.
config (dict): The configuration dictionary for the device.
static_device_test (StaticDeviceTest): The static device test instance.
connect (bool, optional): Whether to connect to the device. Defaults to False.
"""
super().__init__()
self.device_name = device_name
self.config = config
self._connect = connect
self._static_device_test = static_device_test
self.signals = DeviceValidationResult()
def run(self):
"""Run method for device validation."""
if self._static_device_test is None:
logger.error(
f"Ophyd devices or bec_server not available, cannot run validation for device {self.device_name}."
)
return
try:
self._static_device_test.config = {self.device_name: self.config}
results = self._static_device_test.run_with_list_output(connect=self._connect)
success = results[0].success
msg = results[0].message
self.signals.device_validated.emit(self.device_name, success, msg)
except Exception:
content = traceback.format_exc()
logger.error(f"Validation failed for device {self.device_name}. Exception: {content}")
self.signals.device_validated.emit(self.device_name, False, content)
class ValidationListItem(QtWidgets.QWidget):
"""Custom list item widget showing device name and validation status."""
def __init__(self, device_name: str, device_config: dict, parent=None):
"""
Initialize the validation list item.
Args:
device_name (str): The name of the device.
device_config (dict): The configuration of the device.
validation_colors (dict[ValidationStatus, QtGui.QColor]): The colors for each validation status.
parent (QtWidgets.QWidget, optional): The parent widget.
"""
super().__init__(parent)
self.main_layout = QtWidgets.QHBoxLayout(self)
self.main_layout.setContentsMargins(2, 2, 2, 2)
self.main_layout.setSpacing(4)
self.device_name = device_name
self.device_config = device_config
self.validation_msg = "Validation in progress..."
self._setup_ui()
def _setup_ui(self):
"""Setup the UI for the list item."""
label = QtWidgets.QLabel(self.device_name)
self.main_layout.addWidget(label)
self.main_layout.addStretch()
self._spinner = SpinnerWidget(parent=self)
self._spinner.speed = 80
self._spinner.setFixedSize(24, 24)
self.main_layout.addWidget(self._spinner)
self._base_style = "font-weight: bold;"
self.setStyleSheet(self._base_style)
self._start_spinner()
def _start_spinner(self):
"""Start the spinner animation."""
self._spinner.start()
QtWidgets.QApplication.processEvents()
def _stop_spinner(self):
"""Stop the spinner animation."""
self._spinner.stop()
self._spinner.setVisible(False)
@SafeSlot()
def on_validation_restart(self):
"""Handle validation restart."""
self.validation_msg = ""
self._start_spinner()
self.setStyleSheet("") # Check if this works as expected
@SafeSlot(str)
def on_validation_failed(self, error_msg: str):
"""Handle validation failure."""
self.validation_msg = error_msg
colors = get_accent_colors()
self._stop_spinner()
self.main_layout.removeWidget(self._spinner)
self._spinner.deleteLater()
label = QtWidgets.QLabel("")
icon = material_icon("error", color=colors.emergency, size=(24, 24))
label.setPixmap(icon)
self.main_layout.addWidget(label)
class DMOphydTest(BECWidget, QtWidgets.QWidget):
"""Widget to test device configurations using ophyd devices."""
# Signal to emit the validation status of a device
device_validated = QtCore.Signal(str, int)
def __init__(self, parent=None, client=None):
super().__init__(parent=parent, client=client)
if not READY_TO_TEST:
self.setDisabled(True)
self.static_device_test = None
else:
from ophyd_devices.utils.static_device_test import StaticDeviceTest
self.static_device_test = StaticDeviceTest(config_dict={})
self._device_list_items: dict[str, QtWidgets.QListWidgetItem] = {}
self._thread_pool = QtCore.QThreadPool.globalInstance()
self._main_layout = QtWidgets.QVBoxLayout(self)
self._main_layout.setContentsMargins(0, 0, 0, 0)
self._main_layout.setSpacing(4)
# We add a splitter between the list and the text box
self.splitter = QtWidgets.QSplitter(QtCore.Qt.Orientation.Vertical)
self._main_layout.addWidget(self.splitter)
self._setup_list_ui()
self._setup_textbox_ui()
def _setup_list_ui(self):
"""Setup the list UI."""
self._list_widget = QtWidgets.QListWidget(self)
self._list_widget.setSelectionMode(QtWidgets.QAbstractItemView.SingleSelection)
self.splitter.addWidget(self._list_widget)
# Connect signals
self._list_widget.currentItemChanged.connect(self._on_current_item_changed)
def _setup_textbox_ui(self):
"""Setup the text box UI."""
self._text_box = QtWidgets.QTextEdit(self)
self._text_box.setReadOnly(True)
self._text_box.setFocusPolicy(QtCore.Qt.NoFocus)
self.splitter.addWidget(self._text_box)
@SafeSlot(dict)
def add_device_configs(self, device_configs: dict[str, dict]) -> None:
"""Receive an update with device configs.
Args:
device_configs (dict[str, dict]): The updated device configurations.
"""
for device_name, device_config in device_configs.items():
if device_name in self._device_list_items:
logger.error(f"Device {device_name} is already in the list.")
return
item = QtWidgets.QListWidgetItem(self._list_widget)
widget = ValidationListItem(device_name=device_name, device_config=device_config)
# wrap it in a QListWidgetItem
item.setSizeHint(widget.sizeHint())
self._list_widget.addItem(item)
self._list_widget.setItemWidget(item, widget)
self._device_list_items[device_name] = item
self._run_device_validation(widget)
@SafeSlot(dict)
def remove_device_configs(self, device_configs: dict[str, dict]) -> None:
"""Remove device configs from the list.
Args:
device_name (str): The name of the device to remove.
"""
for device_name in device_configs.keys():
if device_name not in self._device_list_items:
logger.warning(f"Device {device_name} not found in list.")
return
self._remove_list_item(device_name)
def _remove_list_item(self, device_name: str):
"""Remove a device from the list."""
# Get the list item
item = self._device_list_items.pop(device_name)
# Retrieve the custom widget attached to the item
widget = self._list_widget.itemWidget(item)
if widget is not None:
widget.deleteLater() # clean up custom widget
# Remove the item from the QListWidget
row = self._list_widget.row(item)
self._list_widget.takeItem(row)
def _run_device_validation(self, widget: ValidationListItem):
"""
Run the device validation in a separate thread.
Args:
widget (ValidationListItem): The widget to validate.
"""
if not READY_TO_TEST:
logger.error("Ophyd devices or bec_server not available, cannot run validation.")
return
if (
widget.device_name in self.client.device_manager.devices
): # TODO and config has to be exact the same..
self._on_device_validated(
widget.device_name,
ValidationStatus.VALID,
f"Device {widget.device_name} is already in active config",
)
return
runnable = DeviceValidationRunnable(
device_name=widget.device_name,
config=widget.device_config,
static_device_test=self.static_device_test,
connect=False,
)
runnable.signals.device_validated.connect(self._on_device_validated)
self._thread_pool.start(runnable)
@SafeSlot(str, bool, str)
def _on_device_validated(self, device_name: str, success: bool, message: str):
"""Handle the device validation result.
Args:
device_name (str): The name of the device.
success (bool): Whether the validation was successful.
message (str): The validation message.
"""
logger.info(f"Device {device_name} validation result: {success}, message: {message}")
item = self._device_list_items.get(device_name, None)
if not item:
logger.error(f"Device {device_name} not found in the list.")
return
if success:
self._remove_list_item(device_name=device_name)
self.device_validated.emit(device_name, ValidationStatus.VALID.value)
else:
widget: ValidationListItem = self._list_widget.itemWidget(item)
widget.on_validation_failed(message)
self.device_validated.emit(device_name, ValidationStatus.FAILED.value)
def _on_current_item_changed(
self, current: QtWidgets.QListWidgetItem, previous: QtWidgets.QListWidgetItem
):
"""Handle the current item change in the list widget.
Args:
current (QListWidgetItem): The currently selected item.
previous (QListWidgetItem): The previously selected item.
"""
widget: ValidationListItem = self._list_widget.itemWidget(current)
if widget:
try:
formatted_html = self._format_validation_message(widget.validation_msg)
self._text_box.setHtml(formatted_html)
except Exception as e:
logger.error(f"Error formatting validation message: {e}")
self._text_box.setPlainText(widget.validation_msg)
def _format_validation_message(self, raw_msg: str) -> str:
"""Simple HTML formatting for validation messages, wrapping text naturally."""
if not raw_msg.strip():
return "<i>Validation in progress...</i>"
if raw_msg == "Validation in progress...":
return "<i>Validation in progress...</i>"
raw_msg = escape(raw_msg)
# Split into lines
lines = raw_msg.splitlines()
summary = lines[0] if lines else "Validation Result"
rest = "\n".join(lines[1:]).strip()
# Split traceback / final ERROR
tb_match = re.search(r"(Traceback.*|ERROR:.*)$", rest, re.DOTALL | re.MULTILINE)
if tb_match:
main_text = rest[: tb_match.start()].strip()
error_detail = tb_match.group().strip()
else:
main_text = rest
error_detail = ""
# Highlight field names in orange (simple regex for word: Field)
main_text_html = re.sub(
r"(\b\w+\b)(?=: Field required)",
r'<span style="color:#FF8C00; font-weight:bold;">\1</span>',
main_text,
)
# Wrap in div for monospace, allowing wrapping
main_text_html = (
f'<div style="white-space: pre-wrap;">{main_text_html}</div>' if main_text_html else ""
)
# Traceback / error in red
error_html = (
f'<div style="white-space: pre-wrap; color:#A00000;">{error_detail}</div>'
if error_detail
else ""
)
# Summary at top, dark red
html = (
f'<div style="font-family: monospace; font-size:13px; white-space: pre-wrap;">'
f'<div style="font-weight:bold; color:#8B0000; margin-bottom:4px;">{summary}</div>'
f"{main_text_html}"
f"{error_html}"
f"</div>"
)
return html
@SafeSlot()
def clear_list(self):
"""Clear the device list."""
self._thread_pool.clear()
if self._thread_pool.waitForDone(2000) is False: # Wait for threads to finish
logger.error("Failed to wait for threads to finish. Removing items from the list.")
self._device_list_items.clear()
self._list_widget.clear()
def remove_device(self, device_name: str):
"""Remove a device from the list."""
item = self._device_list_items.pop(device_name, None)
if item:
self._list_widget.removeItemWidget(item)
if __name__ == "__main__":
import sys
from bec_lib.bec_yaml_loader import yaml_load
# pylint: disable=ungrouped-imports
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
device_manager_ophyd_test = DMOphydTest()
config_path = "/Users/appel_c/work_psi_awi/bec_workspace/csaxs_bec/csaxs_bec/device_configs/endstation.yaml"
cfg = yaml_load(config_path)
cfg.update({"device_will_fail": {"name": "device_will_fail", "some_param": 1}})
device_manager_ophyd_test.add_device_configs(cfg)
device_manager_ophyd_test.show()
device_manager_ophyd_test.setWindowTitle("Device Manager Ophyd Test")
device_manager_ophyd_test.resize(800, 600)
sys.exit(app.exec_())