Compare commits

..

1 Commits

Author SHA1 Message Date
wyzula_j fcefe9e33d wip push and autohide for docks 2025-09-04 14:16:29 +02:00
20 changed files with 379 additions and 2547 deletions
@@ -0,0 +1,98 @@
from typing import List
import PySide6QtAds as QtAds
from PySide6.QtWidgets import QTableWidget, QListWidget, QTableWidgetItem, QPushButton
from PySide6QtAds import CDockManager, CDockWidget
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import QSplitter, QTreeWidget, QVBoxLayout, QWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
class AutoHideOverlay(QWidget):
def __init__(self, parent=None, *args, **kwargs):
super().__init__(parent, *args, **kwargs)
# Top-level layout hosting a toolbar and the dock manager
self._root_layout = QVBoxLayout(self)
self._root_layout.setContentsMargins(0, 0, 0, 0)
self._root_layout.setSpacing(0)
# IMPORTANT, if you decide to use autohide, you must set these flags before creating ANY CDockManager, it has to be put into inity, let me know and I can enforce it
CDockManager.setAutoHideConfigFlags(CDockManager.DefaultAutoHideConfig)
CDockManager.setAutoHideConfigFlag(
CDockManager.DockAreaHasAutoHideButton, False
) # to not have everywhere these buttons
self.dock_manager = CDockManager(self)
self._root_layout.addWidget(self.dock_manager)
# Initialize the widgets
self.left_widget = QWidget(self)
self.left_widget.layout = QVBoxLayout(self.left_widget)
self.auto_hide_controls = QPushButton("Auto Hide Controls", self.left_widget)
self.auto_hide_controls.setCheckable(True)
self.tree_widget = QTreeWidget(self)
self.left_widget.layout.addWidget(self.auto_hide_controls)
self.left_widget.layout.addWidget(self.tree_widget)
self.plotting_ads = AdvancedDockArea(self, mode="plot", default_add_direction="bottom")
# table with some data
self.table = QTableWidget(10, 3, self)
self.table.setHorizontalHeaderLabels(["Column 1", "Column 2", "Column 3"])
for row in range(10):
for col in range(3):
self.table.setItem(row, col, QTableWidgetItem(f"Item {row+1}, {col+1}"))
self.list_widget = QListWidget(self)
for i in range(10):
self.list_widget.addItem(f"List Item {i+1}")
# Create the dock widgets
self.tree_dock = QtAds.CDockWidget("Explorer", self)
self.tree_dock.setWidget(self.left_widget)
self.table_dock = QtAds.CDockWidget("Table", self)
self.table_dock.setWidget(self.table)
self.plotting_ads_dock = QtAds.CDockWidget("Plotting Area", self)
self.plotting_ads_dock.setWidget(self.plotting_ads)
# this one will be autohide one
self.list_dock = QtAds.CDockWidget("List Widget", self)
self.list_dock.setWidget(self.list_widget)
# Monaco will be central widget
self.dock_manager.setCentralWidget(self.plotting_ads_dock)
# Add the dock widgets to the dock manager
self.dock_manager.addDockWidget(QtAds.DockWidgetArea.BottomDockWidgetArea, self.table_dock)
self.dock_manager.addDockWidget(QtAds.DockWidgetArea.LeftDockWidgetArea, self.tree_dock)
self.autohide_container = self.dock_manager.addAutoHideDockWidget(
QtAds.SideBarRight, self.list_dock
)
self.autohide_container.setSize(350)
# Connect signals
self.auto_hide_controls.toggled.connect(self.toggle_auto_hide)
def toggle_auto_hide(self, checked: bool):
# start as collapsed, implement better logic
# self.autohide_container.collapseView(checked)
# or this if you just want toggle
self.autohide_container.toggleCollapseState()
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
apply_theme("dark")
auto_hide_overlay = AutoHideOverlay()
auto_hide_overlay.show()
auto_hide_overlay.resize(1200, 800)
sys.exit(app.exec())
@@ -0,0 +1,123 @@
from typing import Optional
import PySide6QtAds as QtAds
from PySide6.QtWidgets import QTableWidget, QListWidget, QTableWidgetItem, QPushButton
from PySide6QtAds import CDockManager, CDockWidget
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import QSplitter, QTreeWidget, QVBoxLayout, QWidget
from bec_widgets.utils.colors import apply_theme
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.containers.main_window.main_window import BECMainWindow
class AutoHidePush(QWidget):
def __init__(self, parent=None, *args, **kwargs):
super().__init__(parent, *args, **kwargs)
# Top-level layout hosting a toolbar and the dock manager
self._root_layout = QVBoxLayout(self)
self._root_layout.setContentsMargins(0, 0, 0, 0)
self._root_layout.setSpacing(0)
# CDockManager.setConfigFlag(CDockManager.FocusHighlighting, True)
CDockManager.setAutoHideConfigFlags(CDockManager.DefaultAutoHideConfig)
# CDockManager.setAutoHideConfigFlag(CDockManager.AutoHideShowOnMouseOver, True)
self.dock_manager = CDockManager(self)
self._root_layout.addWidget(self.dock_manager)
# Initialize the widgets
self.left_widget = QWidget(self)
self.left_widget.layout = QVBoxLayout(self.left_widget)
self.push_mode_btn = QPushButton(self.left_widget)
self.push_mode_btn.setCheckable(True)
self.push_mode_btn.setText("Pin (show and push)")
self.push_mode_btn.setChecked(False)
self.tree_widget = QTreeWidget(self)
self.left_widget.layout.addWidget(self.push_mode_btn)
self.left_widget.layout.addWidget(self.tree_widget)
self.plotting_ads = AdvancedDockArea(self, mode="plot", default_add_direction="bottom")
# table with some data
self.table = QTableWidget(10, 3, self)
self.table.setHorizontalHeaderLabels(["Column 1", "Column 2", "Column 3"])
for row in range(10):
for col in range(3):
self.table.setItem(row, col, QTableWidgetItem(f"Item {row+1}, {col+1}"))
self.list_widget = QListWidget(self)
for i in range(10):
self.list_widget.addItem(f"List Item {i+1}")
# Create the dock widgets
self.tree_dock = QtAds.CDockWidget("Explorer", self)
self.tree_dock.setWidget(self.left_widget)
self.table_dock = QtAds.CDockWidget("Table", self)
self.table_dock.setWidget(self.table)
self.plotting_ads_dock = QtAds.CDockWidget("Plotting Area", self)
self.plotting_ads_dock.setWidget(self.plotting_ads)
# this one will be autohide one
self.list_dock = QtAds.CDockWidget("List Widget", self)
self.list_dock.setWidget(self.list_widget)
# Monaco will be central widget
self.dock_manager.setCentralWidget(self.plotting_ads_dock)
# Add the dock widgets to the dock manager
self.dock_manager.addDockWidget(QtAds.DockWidgetArea.BottomDockWidgetArea, self.table_dock)
self.dock_manager.addDockWidget(QtAds.DockWidgetArea.LeftDockWidgetArea, self.tree_dock)
self.autohide_container = self.dock_manager.addAutoHideDockWidget(
QtAds.SideBarRight, self.list_dock
)
self.autohide_container.setSize(350)
self._last_side = QtAds.SideBarRight
# Ensure auto-hide starts collapsed and button text is correct
self.autohide_container.collapseView(True)
self.push_mode_btn.setText("Pin (show and push)")
# Connect signals
self.push_mode_btn.toggled.connect(self.toggle_pin_mode)
def toggle_pin_mode(self, pinned: bool):
# pinned=True -> convert auto-hide overlay into a normal dock (push layout) and show it
if pinned:
if self.autohide_container is not None:
# Remember the current side (edge) to restore when unpinning
if hasattr(self.autohide_container, "sideBarLocation"):
self._last_side = self.autohide_container.sideBarLocation()
# Move contents back into the dock container (this deletes the auto-hide container)
self.autohide_container.moveContentsToParent()
self.autohide_container = None
# Ensure the dock is visible when pinned
self.list_dock.show()
self.push_mode_btn.setText("Unpin (send to sidebar)")
else:
# Convert the pinned dock back into an auto-hide overlay at the last used side and collapse it
side = getattr(self, "_last_side", QtAds.SideBarRight)
container = self.dock_manager.addAutoHideDockWidget(side, self.list_dock)
# Preserve a sensible size from the current dock widget geometry
if side in (QtAds.SideBarLeft, QtAds.SideBarRight):
target_size = max(200, min(self.list_dock.width(), 600))
else:
target_size = max(200, min(self.list_dock.height(), 600))
container.setSize(int(target_size))
self.autohide_container = container
# Collapse so it disappears to the side tab
self.autohide_container.collapseView(True)
self.push_mode_btn.setText("Pin (show and push)")
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
apply_theme("dark")
auto_hide_overlay = AutoHidePush()
auto_hide_overlay.show()
auto_hide_overlay.resize(1200, 800)
sys.exit(app.exec())
@@ -1,67 +0,0 @@
from qtpy import QtCore, QtWidgets
from bec_widgets.examples.device_manager_view.device_manager_view import DeviceManagerView
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
class BECMainApp(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
# Main layout
layout = QtWidgets.QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
# Tab widget as central area
self.tabs = QtWidgets.QTabWidget(self)
self.tabs.setContentsMargins(0, 0, 0, 0)
self.tabs.setTabPosition(QtWidgets.QTabWidget.West) # Tabs on the left side
layout.addWidget(self.tabs)
# Add DM
self._add_device_manager_view()
# Add Plot area
self._add_ad_dockarea()
# Adjust size of tab bar
# TODO not yet properly working, tabs a spread across the full length, to be checked!
tab_bar = self.tabs.tabBar()
tab_bar.setFixedWidth(tab_bar.sizeHint().width())
def _add_device_manager_view(self) -> None:
self.device_manager_view = DeviceManagerView(parent=self)
self.add_tab(self.device_manager_view, "Device Manager")
def _add_ad_dockarea(self) -> None:
self.advanced_dock_area = AdvancedDockArea(parent=self)
self.add_tab(self.advanced_dock_area, "Plot Area")
def add_tab(self, widget: QtWidgets.QWidget, title: str):
"""Add a custom QWidget as a tab."""
tab_container = QtWidgets.QWidget()
tab_layout = QtWidgets.QVBoxLayout(tab_container)
tab_layout.setContentsMargins(0, 0, 0, 0)
tab_layout.setSpacing(0)
tab_layout.addWidget(widget)
self.tabs.addTab(tab_container, title)
if __name__ == "__main__":
import sys
from bec_lib.bec_yaml_loader import yaml_load
from bec_qthemes import apply_theme
app = QtWidgets.QApplication(sys.argv)
apply_theme("light")
win = BECMainApp()
config_path = "/Users/appel_c/work_psi_awi/bec_workspace/csaxs_bec/csaxs_bec/device_configs/first_light.yaml"
cfg = yaml_load(config_path)
cfg.update({"device_will_fail": {"name": "device_will_fail", "some_param": 1}})
win.device_manager_view.device_table_view.set_device_config(cfg)
win.resize(1920, 1080)
win.show()
sys.exit(app.exec_())
@@ -1,491 +0,0 @@
from __future__ import annotations
import os
from typing import TYPE_CHECKING, List
import PySide6QtAds as QtAds
import yaml
from bec_lib.bec_yaml_loader import yaml_load
from bec_lib.file_utils import DeviceConfigWriter
from bec_lib.logger import bec_logger
from bec_lib.plugin_helper import plugin_package_name, plugin_repo_path
from bec_qthemes import apply_theme
from PySide6QtAds import CDockManager, CDockWidget
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import QFileDialog, QMessageBox, QSplitter, QVBoxLayout, QWidget
from bec_widgets import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.utils.toolbars.actions import MaterialIconAction
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import ModularToolBar
from bec_widgets.widgets.containers.advanced_dock_area.advanced_dock_area import AdvancedDockArea
from bec_widgets.widgets.control.device_manager.components import (
DeviceTableView,
DMConfigView,
DMOphydTest,
DocstringView,
)
from bec_widgets.widgets.control.device_manager.components.available_device_resources.available_device_resources import (
AvailableDeviceResources,
)
if TYPE_CHECKING:
from bec_lib.client import BECClient
logger = bec_logger.logger
def set_splitter_weights(splitter: QSplitter, weights: List[float]) -> None:
"""
Apply initial sizes to a splitter using weight ratios, e.g. [1,3,2,1].
Works for horizontal or vertical splitters and sets matching stretch factors.
"""
def apply():
n = splitter.count()
if n == 0:
return
w = list(weights[:n]) + [1] * max(0, n - len(weights))
w = [max(0.0, float(x)) for x in w]
tot_w = sum(w)
if tot_w <= 0:
w = [1.0] * n
tot_w = float(n)
total_px = (
splitter.width() if splitter.orientation() == Qt.Horizontal else splitter.height()
)
if total_px < 2:
QTimer.singleShot(0, apply)
return
sizes = [max(1, int(total_px * (wi / tot_w))) for wi in w]
diff = total_px - sum(sizes)
if diff != 0:
idx = max(range(n), key=lambda i: w[i])
sizes[idx] = max(1, sizes[idx] + diff)
splitter.setSizes(sizes)
for i, wi in enumerate(w):
splitter.setStretchFactor(i, max(1, int(round(wi * 100))))
QTimer.singleShot(0, apply)
class DeviceManagerView(BECWidget, QWidget):
def __init__(self, parent=None, *args, **kwargs):
super().__init__(parent=parent, client=None, *args, **kwargs)
# Top-level layout hosting a toolbar and the dock manager
self._root_layout = QVBoxLayout(self)
self._root_layout.setContentsMargins(0, 0, 0, 0)
self._root_layout.setSpacing(0)
self.dock_manager = CDockManager(self)
self._root_layout.addWidget(self.dock_manager)
# Available Resources Widget
self.available_devices = AvailableDeviceResources(self)
self.available_devices_dock = QtAds.CDockWidget("Available Devices", self)
self.available_devices_dock.setWidget(self.available_devices)
# Device Table View widget
self.device_table_view = DeviceTableView(self)
self.device_table_view_dock = QtAds.CDockWidget("Device Table", self)
self.device_table_view_dock.setWidget(self.device_table_view)
# Device Config View widget
self.dm_config_view = DMConfigView(self)
self.dm_config_view_dock = QtAds.CDockWidget("Device Config View", self)
self.dm_config_view_dock.setWidget(self.dm_config_view)
# Docstring View
self.dm_docs_view = DocstringView(self)
self.dm_docs_view_dock = QtAds.CDockWidget("Docstring View", self)
self.dm_docs_view_dock.setWidget(self.dm_docs_view)
# Ophyd Test view
self.ophyd_test_view = DMOphydTest(self)
self.ophyd_test_dock_view = QtAds.CDockWidget("Ophyd Test View", self)
self.ophyd_test_dock_view.setWidget(self.ophyd_test_view)
# Arrange widgets within the QtAds dock manager
# Central widget area
self.central_dock_area = self.dock_manager.setCentralWidget(self.device_table_view_dock)
self.dock_manager.addDockWidget(
QtAds.DockWidgetArea.BottomDockWidgetArea,
self.dm_docs_view_dock,
self.central_dock_area,
)
# Left Area
self.left_dock_area = self.dock_manager.addDockWidget(
QtAds.DockWidgetArea.LeftDockWidgetArea, self.available_devices_dock
)
self.dock_manager.addDockWidget(
QtAds.DockWidgetArea.BottomDockWidgetArea, self.dm_config_view_dock, self.left_dock_area
)
# Right area
self.dock_manager.addDockWidget(
QtAds.DockWidgetArea.RightDockWidgetArea, self.ophyd_test_dock_view
)
for dock in self.dock_manager.dockWidgets():
# dock.setFeature(CDockWidget.DockWidgetDeleteOnClose, True)#TODO implement according to MonacoDock or AdvancedDockArea
# dock.setFeature(CDockWidget.CustomCloseHandling, True) #TODO same
dock.setFeature(CDockWidget.DockWidgetClosable, False)
dock.setFeature(CDockWidget.DockWidgetFloatable, False)
dock.setFeature(CDockWidget.DockWidgetMovable, False)
# Fetch all dock areas of the dock widgets (on our case always one dock area)
for dock in self.dock_manager.dockWidgets():
area = dock.dockAreaWidget()
area.titleBar().setVisible(False)
# Apply stretch after the layout is done
self.set_default_view([2, 8, 2], [3, 1])
# self.set_default_view([2, 8, 2], [2, 2, 4])
# Connect slots
self.device_table_view.selected_device.connect(self.dm_config_view.on_select_config)
self.device_table_view.selected_device.connect(self.dm_docs_view.on_select_config)
self.ophyd_test_view.device_validated.connect(
self.device_table_view.update_device_validation
)
self.device_table_view.device_configs_added.connect(self.ophyd_test_view.add_device_configs)
self._add_toolbar()
def _add_toolbar(self):
self.toolbar = ModularToolBar(self)
# Add IO actions
self._add_io_actions()
self._add_table_actions()
self.toolbar.show_bundles(["IO", "Table"])
self._root_layout.insertWidget(0, self.toolbar)
def _add_io_actions(self):
# Create IO bundle
io_bundle = ToolbarBundle("IO", self.toolbar.components)
# Add load config from plugin dir
self.toolbar.add_bundle(io_bundle)
load = MaterialIconAction(
icon_name="file_open", parent=self, tooltip="Load configuration file from disk"
)
self.toolbar.components.add_safe("load", load)
load.action.triggered.connect(self._load_file_action)
io_bundle.add_action("load")
# Add safe to disk
safe_to_disk = MaterialIconAction(
icon_name="file_save", parent=self, tooltip="Save config to disk"
)
self.toolbar.components.add_safe("safe_to_disk", safe_to_disk)
safe_to_disk.action.triggered.connect(self._safe_to_disk_action)
io_bundle.add_action("safe_to_disk")
# Add load config from redis
load_redis = MaterialIconAction(
icon_name="cached", parent=self, tooltip="Load current config from Redis"
)
load_redis.action.triggered.connect(self._load_redis_action)
self.toolbar.components.add_safe("load_redis", load_redis)
io_bundle.add_action("load_redis")
# Update config action
update_config_redis = MaterialIconAction(
icon_name="cloud_upload", parent=self, tooltip="Update current config in Redis"
)
update_config_redis.action.triggered.connect(self._update_redis_action)
self.toolbar.components.add_safe("update_config_redis", update_config_redis)
io_bundle.add_action("update_config_redis")
# Table actions
def _add_table_actions(self) -> None:
table_bundle = ToolbarBundle("Table", self.toolbar.components)
# Add load config from plugin dir
self.toolbar.add_bundle(table_bundle)
# Reset composed view
reset_composed = MaterialIconAction(
icon_name="delete_sweep", parent=self, tooltip="Reset current composed config view"
)
reset_composed.action.triggered.connect(self._reset_composed_view)
self.toolbar.components.add_safe("reset_composed", reset_composed)
table_bundle.add_action("reset_composed")
# Add device
add_device = MaterialIconAction(icon_name="add", parent=self, tooltip="Add new device")
add_device.action.triggered.connect(self._add_device_action)
self.toolbar.components.add_safe("add_device", add_device)
table_bundle.add_action("add_device")
# Remove device
remove_device = MaterialIconAction(icon_name="remove", parent=self, tooltip="Remove device")
remove_device.action.triggered.connect(self._remove_device_action)
self.toolbar.components.add_safe("remove_device", remove_device)
table_bundle.add_action("remove_device")
# Rerun validation
rerun_validation = MaterialIconAction(
icon_name="checklist", parent=self, tooltip="Run device validation on selected devices"
)
rerun_validation.action.triggered.connect(self._rerun_validation_action)
self.toolbar.components.add_safe("rerun_validation", rerun_validation)
table_bundle.add_action("rerun_validation")
# Most likly, no actions on available devices
# Actions (vielleicht bundle fuer available devices )
# - reset composed view
# - add new device (EpicsMotor, EpicsMotorECMC, EpicsSignal, CustomDevice)
# - remove device
# - rerun validation (with/without connect)
# IO actions
@SafeSlot()
def _load_file_action(self):
"""Action for the 'load' action to load a config from disk for the io_bundle of the toolbar."""
# Check if plugin repo is installed...
try:
plugin_path = plugin_repo_path()
plugin_name = plugin_package_name()
config_path = os.path.join(plugin_path, plugin_name, "device_configs")
except ValueError:
# Get the recovery config path as fallback
config_path = self._get_recovery_config_path()
logger.warning(
f"No plugin repository installed, fallback to recovery config path: {config_path}"
)
# Implement the file loading logic here
start_dir = os.path.abspath(config_path)
file_path, _ = QFileDialog.getOpenFileName(
self, caption="Select Config File", dir=start_dir
)
if file_path:
try:
config = yaml_load(file_path)
except Exception as e:
logger.error(f"Failed to load config from file {file_path}. Error: {e}")
return
self.device_table_view.set_device_config(
config
) # TODO ADD QDialog with 'replace', 'add' & 'cancel'
# TODO would we ever like to add the current config to an existing composition
@SafeSlot()
def _load_redis_action(self):
"""Action for the 'load_redis' action to load the current config from Redis for the io_bundle of the toolbar."""
reply = QMessageBox.question(
self,
"Load currently active config",
"Do you really want to flush the current config and reload?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No,
)
if reply == QMessageBox.Yes:
cfg = {}
config_list = self.client.device_manager._get_redis_device_config()
for item in config_list:
k = item["name"]
item.pop("name")
cfg[k] = item
self.device_table_view.set_device_config(cfg)
else:
return
@SafeSlot()
def _safe_to_disk_action(self):
"""Action for the 'safe_to_disk' action to save the current config to disk."""
# Check if plugin repo is installed...
try:
config_path = self._get_recovery_config_path()
except ValueError:
# Get the recovery config path as fallback
config_path = os.path.abspath(os.path.expanduser("~"))
logger.warning(f"Failed to find recovery config path, fallback to: {config_path}")
# Implement the file loading logic here
file_path, _ = QFileDialog.getSaveFileName(
self, caption="Save Config File", dir=config_path
)
if file_path:
config = self.device_table_view.get_device_config()
with open(file_path, "w") as file:
file.write(yaml.dump(config))
# TODO add here logic, should be asyncronous, but probably block UI, and show a loading spinner. If failed, it should report..
@SafeSlot()
def _update_redis_action(self):
"""Action for the 'update_redis' action to update the current config in Redis."""
config = self.device_table_view.get_device_config()
reply = QMessageBox.question(
self,
"Not implemented yet",
"This feature has not been implemented yet, will be coming soon...!!",
QMessageBox.Cancel,
QMessageBox.Cancel,
)
# Table actions
@SafeSlot()
def _reset_composed_view(self):
"""Action for the 'reset_composed_view' action to reset the composed view."""
reply = QMessageBox.question(
self,
"Clear View",
"You are about to clear the current composed config view, please confirm...",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No,
)
if reply == QMessageBox.Yes:
self.device_table_view.clear_device_configs()
# TODO Here we would like to implement a custom popup view, that allows to add new devices
# We want to have a combobox to choose from EpicsMotor, EpicsMotorECMC, EpicsSignal, EpicsSignalRO, and maybe EpicsSignalWithRBV and custom Device
# For all default Epics devices, we would like to preselect relevant fields, and prompt them with the proper deviceConfig args already, i.e. 'prefix', 'read_pv', 'write_pv' etc..
# For custom Device, they should receive all options. It might be cool to get a side panel with docstring view of the class upon inspecting it to make it easier in case deviceConfig entries are required..
@SafeSlot()
def _add_device_action(self):
"""Action for the 'add_device' action to add a new device."""
# Implement the logic to add a new device
reply = QMessageBox.question(
self,
"Not implemented yet",
"This feature has not been implemented yet, will be coming soon...!!",
QMessageBox.Cancel,
QMessageBox.Cancel,
)
# TODO fix the device table remove actions. This is currently not working properly...
@SafeSlot()
def _remove_device_action(self):
"""Action for the 'remove_device' action to remove a device."""
reply = QMessageBox.question(
self,
"Not implemented yet",
"This feature has not been implemented yet, will be coming soon...!!",
QMessageBox.Cancel,
QMessageBox.Cancel,
)
# TODO implement proper logic for validation. We should also carefully review how these jobs update the table, and how we can cancel pending validations
# in case they are no longer relevant. We might want to 'block' the interactivity on the items for which validation runs with 'connect'!
@SafeSlot()
def _rerun_validation_action(self):
"""Action for the 'rerun_validation' action to rerun validation on selected devices."""
# Implement the logic to rerun validation on selected devices
reply = QMessageBox.question(
self,
"Not implemented yet",
"This feature has not been implemented yet, will be coming soon...!!",
QMessageBox.Cancel,
QMessageBox.Cancel,
)
####### Default view has to be done with setting up splitters ########
def set_default_view(self, horizontal_weights: list, vertical_weights: list):
"""Apply initial weights to every horizontal and vertical splitter.
Examples:
horizontal_weights = [1, 3, 2, 1]
vertical_weights = [3, 7] # top:bottom = 30:70
"""
splitters_h = []
splitters_v = []
for splitter in self.findChildren(QSplitter):
if splitter.orientation() == Qt.Horizontal:
splitters_h.append(splitter)
elif splitter.orientation() == Qt.Vertical:
splitters_v.append(splitter)
def apply_all():
for s in splitters_h:
set_splitter_weights(s, horizontal_weights)
for s in splitters_v:
set_splitter_weights(s, vertical_weights)
QTimer.singleShot(0, apply_all)
def set_stretch(self, *, horizontal=None, vertical=None):
"""Update splitter weights and re-apply to all splitters.
Accepts either a list/tuple of weights (e.g., [1,3,2,1]) or a role dict
for convenience: horizontal roles = {"left","center","right"},
vertical roles = {"top","bottom"}.
"""
def _coerce_h(x):
if x is None:
return None
if isinstance(x, (list, tuple)):
return list(map(float, x))
if isinstance(x, dict):
return [
float(x.get("left", 1)),
float(x.get("center", x.get("middle", 1))),
float(x.get("right", 1)),
]
return None
def _coerce_v(x):
if x is None:
return None
if isinstance(x, (list, tuple)):
return list(map(float, x))
if isinstance(x, dict):
return [float(x.get("top", 1)), float(x.get("bottom", 1))]
return None
h = _coerce_h(horizontal)
v = _coerce_v(vertical)
if h is None:
h = [1, 1, 1]
if v is None:
v = [1, 1]
self.set_default_view(h, v)
def _get_recovery_config_path(self) -> str:
"""Get the recovery config path from the log_writer config."""
# pylint: disable=protected-access
log_writer_config: BECClient = self.client._service_config.config.get("log_writer", {})
writer = DeviceConfigWriter(service_config=log_writer_config)
return os.path.abspath(os.path.expanduser(writer.get_recovery_directory()))
if __name__ == "__main__":
import sys
from copy import deepcopy
from bec_lib.bec_yaml_loader import yaml_load
from qtpy.QtWidgets import QApplication
from bec_widgets.widgets.utility.visual.dark_mode_button.dark_mode_button import DarkModeButton
app = QApplication(sys.argv)
w = QWidget()
l = QVBoxLayout()
w.setLayout(l)
apply_theme("dark")
button = DarkModeButton()
l.addWidget(button)
device_manager_view = DeviceManagerView()
l.addWidget(device_manager_view)
config_path = "/Users/appel_c/work_psi_awi/bec_workspace/csaxs_bec/csaxs_bec/device_configs/first_light.yaml"
cfg = yaml_load(config_path)
cfg.update({"device_will_fail": {"name": "device_will_fail", "some_param": 1}})
# config = device_manager_view.client.device_manager._get_redis_device_config()
device_manager_view.device_table_view.set_device_config(cfg)
w.show()
w.setWindowTitle("Device Manager View")
w.resize(1920, 1080)
# developer_view.set_stretch(horizontal=[1, 3, 2], vertical=[5, 5]) #can be set during runtime
sys.exit(app.exec_())
@@ -1,110 +0,0 @@
"""Top Level wrapper for device_manager widget"""
from __future__ import annotations
import os
from bec_lib.bec_yaml_loader import yaml_load
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from qtpy import QtCore, QtWidgets
from bec_widgets.examples.device_manager_view.device_manager_view import DeviceManagerView
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
logger = bec_logger.logger
class DeviceManagerWidget(BECWidget, QtWidgets.QWidget):
def __init__(self, parent=None, client=None):
super().__init__(client=client, parent=parent)
self.stacked_layout = QtWidgets.QStackedLayout()
self.stacked_layout.setContentsMargins(0, 0, 0, 0)
self.stacked_layout.setSpacing(0)
self.stacked_layout.setStackingMode(QtWidgets.QStackedLayout.StackAll)
self.setLayout(self.stacked_layout)
# Add device manager view
self.device_manager_view = DeviceManagerView()
self.stacked_layout.addWidget(self.device_manager_view)
# Add overlay widget
self._overlay_widget = QtWidgets.QWidget(self)
self._customize_overlay()
self.stacked_layout.addWidget(self._overlay_widget)
self.stacked_layout.setCurrentWidget(self._overlay_widget)
def _customize_overlay(self):
self._overlay_widget.setStyleSheet(
"background: qlineargradient(x1:0, y1:0, x2:0, y2:1,stop:0 #ffffff, stop:1 #e0e0e0);"
)
self._overlay_widget.setAutoFillBackground(True)
self._overlay_layout = QtWidgets.QVBoxLayout()
self._overlay_layout.setAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self._overlay_widget.setLayout(self._overlay_layout)
self._overlay_widget.setSizePolicy(
QtWidgets.QSizePolicy.Policy.Expanding, QtWidgets.QSizePolicy.Policy.Expanding
)
# Load current config
self.button_load_current_config = QtWidgets.QPushButton("Load Current Config")
icon = material_icon(icon_name="database", size=(24, 24), convert_to_pixmap=False)
self.button_load_current_config.setIcon(icon)
self._overlay_layout.addWidget(self.button_load_current_config)
self.button_load_current_config.clicked.connect(self._load_config_clicked)
# Load config from disk
self.button_load_config_from_file = QtWidgets.QPushButton("Load Config From File")
icon = material_icon(icon_name="folder", size=(24, 24), convert_to_pixmap=False)
self.button_load_config_from_file.setIcon(icon)
self._overlay_layout.addWidget(self.button_load_config_from_file)
self.button_load_config_from_file.clicked.connect(self._load_config_from_file_clicked)
self._overlay_widget.setVisible(True)
def _load_config_from_file_clicked(self):
"""Handle click on 'Load Config From File' button."""
start_dir = os.path.expanduser("~")
file_path, _ = QtWidgets.QFileDialog.getOpenFileName(
self, caption="Select Config File", dir=start_dir
)
if file_path:
self._load_config_from_file(file_path)
def _load_config_from_file(self, file_path: str):
try:
config = yaml_load(file_path)
except Exception as e:
logger.error(f"Failed to load config from file {file_path}. Error: {e}")
return
config_list = []
for name, cfg in config.items():
config_list.append(cfg)
config_list[-1]["name"] = name
self.device_manager_view.device_table_view.set_device_config(config_list)
# self.device_manager_view.ophyd_test.on_device_config_update(config)
self.stacked_layout.setCurrentWidget(self.device_manager_view)
@SafeSlot()
def _load_config_clicked(self):
"""Handle click on 'Load Current Config' button."""
config = self.client.device_manager._get_redis_device_config()
config.append({"name": "wrong_device", "some_value": 1})
self.device_manager_view.device_table_view.set_device_config(config)
# self.device_manager_view.ophyd_test.on_device_config_update(config)
self.stacked_layout.setCurrentWidget(self.device_manager_view)
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
device_manager = DeviceManagerWidget()
# config = device_manager.client.device_manager._get_redis_device_config()
# device_manager.device_table_view.set_device_config(config)
device_manager.show()
device_manager.setWindowTitle("Device Manager View")
device_manager.resize(1600, 1200)
# developer_view.set_stretch(horizontal=[1, 3, 2], vertical=[5, 5]) #can be set during runtime
sys.exit(app.exec_())
File diff suppressed because one or more lines are too long
@@ -1,4 +0,0 @@
from .device_table_view import DeviceTableView
from .dm_config_view import DMConfigView
from .dm_docstring_view import DocstringView
from .dm_ophyd_test import DMOphydTest
@@ -1,3 +0,0 @@
from .available_device_resources import AvailableDeviceResources
__all__ = ["AvailableDeviceResources"]
@@ -1,87 +0,0 @@
from random import randint
from typing import Any, Callable, Generator, Iterable, TypeVar
from qtpy.QtCore import QSize
from qtpy.QtWidgets import QListWidgetItem, QWidget
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.control.device_manager.components.available_device_resources.available_device_resources_ui import (
Ui_availableDeviceResources,
)
from bec_widgets.widgets.control.device_manager.components.available_device_resources.device_resource_backend import (
HashableDevice,
get_backend,
)
from bec_widgets.widgets.control.device_manager.components.available_device_resources.device_tag_group import (
DeviceTagGroup,
)
_T = TypeVar("_T")
_RT = TypeVar("_RT")
def _yield_only_passing(fn: Callable[[_T], _RT], vals: Iterable[_T]) -> Generator[_RT, Any, None]:
for v in vals:
try:
yield fn(v)
except BaseException:
pass
class AvailableDeviceResources(BECWidget, QWidget, Ui_availableDeviceResources):
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self.setupUi(self)
self._backend = get_backend()
self._items: dict[str, tuple[QListWidgetItem, DeviceTagGroup]] = {}
self.refresh_full_list()
def refresh_full_list(self):
self.tag_groups_list.clear()
self._items = {}
for tag_group, devices in self._backend.tag_groups.items():
self._add_tag_group(tag_group, devices)
self._add_tag_group("Untagged devices", self._backend.untagged_devices)
def _add_tag_group(self, tag_group: str, devices: set[HashableDevice]):
item = QListWidgetItem(self.tag_groups_list)
tag_group_widget = DeviceTagGroup(self.tag_groups_list, tag_group, devices)
self.tag_groups_list.setItemWidget(item, tag_group_widget)
self.tag_groups_list.addItem(item)
self._items[tag_group] = (item, tag_group_widget)
item.setSizeHint(QSize(tag_group_widget.width(), tag_group_widget.height()))
def _reset_devices_state(self):
for _, tag_group in self._items.values():
tag_group.reset_devices_state()
def set_devices_state(self, devices: Iterable[HashableDevice], included: bool):
for device in devices:
for _, tag_group in self._items.values():
tag_group.set_item_state(hash(device), included)
def resizeEvent(self, event):
super().resizeEvent(event)
for list_item, tag_group_widget in self._items.values():
list_item.setSizeHint(tag_group_widget.sizeHint())
@SafeSlot(list)
def update_devices_state(self, config_list: list[dict[str, Any]]):
self.set_devices_state(
_yield_only_passing(HashableDevice.model_validate, config_list), True
)
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
widget = AvailableDeviceResources()
widget.set_devices_state(
list(filter(lambda _: randint(0, 1) == 1, widget._backend.all_devices)), True
)
widget.show()
sys.exit(app.exec())
@@ -1,27 +0,0 @@
from qtpy.QtCore import QMetaObject, Qt
from qtpy.QtWidgets import QAbstractItemView, QListView, QListWidget, QVBoxLayout
class Ui_availableDeviceResources(object):
def setupUi(self, availableDeviceResources):
if not availableDeviceResources.objectName():
availableDeviceResources.setObjectName("availableDeviceResources")
self.verticalLayout = QVBoxLayout(availableDeviceResources)
self.verticalLayout.setObjectName("verticalLayout")
self.tag_groups_list = QListWidget(availableDeviceResources)
self.tag_groups_list.setObjectName("tag_groups_list")
self.tag_groups_list.setSelectionMode(QAbstractItemView.SelectionMode.NoSelection)
self.tag_groups_list.setVerticalScrollMode(QAbstractItemView.ScrollMode.ScrollPerPixel)
self.tag_groups_list.setHorizontalScrollMode(QAbstractItemView.ScrollMode.ScrollPerPixel)
self.tag_groups_list.setMovement(QListView.Movement.Static)
self.tag_groups_list.setSpacing(2)
self.tag_groups_list.setDragDropMode(QListWidget.DragDropMode.DragOnly)
self.tag_groups_list.setDragEnabled(True)
self.tag_groups_list.setAcceptDrops(False)
self.tag_groups_list.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
availableDeviceResources.setMinimumWidth(250)
availableDeviceResources.resize(250, availableDeviceResources.height())
self.verticalLayout.addWidget(self.tag_groups_list)
QMetaObject.connectSlotsByName(availableDeviceResources)
@@ -1,160 +0,0 @@
from __future__ import annotations
import operator
from functools import reduce
from glob import glob
from pathlib import Path
from textwrap import dedent
from typing import AbstractSet, Protocol
from bec_lib.atlas_models import Device
from bec_lib.bec_yaml_loader import yaml_load
from bec_lib.plugin_helper import plugin_package_name, plugin_repo_path
from pydantic import model_validator
class HashableDevice(Device):
source_files: set[str] = set()
names: set[str] = set()
@model_validator(mode="after")
def add_name(self) -> HashableDevice:
self.names.add(self.name)
return self
def as_normal_device(self):
return Device.model_validate(self)
def __hash__(self) -> int:
config_values = sorted(
(str(kv) for kv in self.deviceConfig.items()) if self.deviceConfig else []
)
return (reduce(operator.add, (self.name, self.deviceClass, *config_values))).__hash__()
def __eq__(self, value: object) -> bool:
if not isinstance(value, self.__class__):
return False
if hash(self) == hash(value):
return True
return False
def rich_text(self) -> str:
return dedent(
f"""
<b><u><h2> {self.name}: </h2></u></b>
<table>
<tr><td> description: </td><td><i> {self.description} </i></td></tr>
<tr><td> config: </td><td><i> {self.deviceConfig} </i></td></tr>
<tr><td> enabled: </td><td><i> {self.enabled} </i></td></tr>
<tr><td> read only: </td><td><i> {self.readOnly} </i></td></tr>
</table>
"""
)
def add_sources(self, other: HashableDevice):
self.source_files.update(other.source_files)
def add_tags(self, other: HashableDevice):
self.deviceTags.update(other.deviceTags)
def add_names(self, other: HashableDevice):
self.names.update(other.names)
class _HashableDeviceSet(set):
def __or__(self, value: AbstractSet) -> _HashableDeviceSet:
for item in self:
if item in value:
for other_item in value:
if other_item == item:
item.add_sources(other_item)
item.add_tags(other_item)
item.add_names(other_item)
for other_item in value:
if other_item not in self:
self.add(other_item)
return self
class DeviceResourceBackend(Protocol):
@property
def tag_groups(self) -> dict[str, set[HashableDevice]]:
"""A dictionary of all availble devices separated by tag groups. The same device may
appear more than once (in different groups)."""
...
@property
def all_devices(self) -> set[HashableDevice]:
"""A set of all availble devices. The same device may not appear more than once."""
...
@property
def untagged_devices(self) -> set[HashableDevice]:
"""A set of all untagged devices. The same device may not appear more than once."""
...
def tags(self) -> set[str]:
"""Returns a set of all the tags in all available devices."""
...
def tag_group(self, tag: str) -> set[HashableDevice]:
"""Returns a set of the devices in the tag group with the given key."""
...
def _devices_from_file(file: str, include_source: bool = True):
data = yaml_load(file, process_includes=False)
return _HashableDeviceSet(
HashableDevice.model_validate(
dev | {"name": name, "source_files": {file} if include_source else set()}
)
for name, dev in data.items()
)
class _ConfigFileBackend(DeviceResourceBackend):
def __init__(self) -> None:
self._raw_device_set: set[
HashableDevice
] = self._get_config_from_backup_file() or self._get_configs_from_plugin_files(
Path(plugin_repo_path()) / plugin_package_name() / "device_configs/"
)
self._tag_groups = self._get_tag_groups()
def _get_config_from_backup_file(self):
return None
# return _devices_from_file(
# "/home/perl_d/Development/bec/bec/logs/device_configs/recovery_configs/recovery_config_2025-08-22_14-02-29.yaml"
# )
def _get_configs_from_plugin_files(self, dir: Path):
files = glob("*.yaml", root_dir=dir, recursive=True)
return reduce(operator.or_, map(_devices_from_file, (str(dir / f) for f in files)))
def _get_tag_groups(self) -> dict[str, set[HashableDevice]]:
return {
tag: set(filter(lambda dev: tag in dev.deviceTags, self._raw_device_set))
for tag in self.tags()
}
@property
def tag_groups(self):
return self._tag_groups
@property
def all_devices(self):
return self._raw_device_set
@property
def untagged_devices(self):
return {d for d in self._raw_device_set if d.deviceTags == set()}
def tags(self) -> set[str]:
return reduce(operator.or_, (dev.deviceTags for dev in self._raw_device_set))
def tag_group(self, tag: str) -> set[HashableDevice]:
return self.tag_groups[tag]
def get_backend() -> DeviceResourceBackend:
return _ConfigFileBackend()
@@ -1,189 +0,0 @@
from typing import NamedTuple
from bec_qthemes import material_icon
from qtpy.QtCore import QSize
from qtpy.QtWidgets import QFrame, QHBoxLayout, QLabel, QListWidgetItem, QVBoxLayout, QWidget
from bec_widgets.widgets.control.device_manager.components.available_device_resources.device_resource_backend import (
HashableDevice,
)
from bec_widgets.widgets.control.device_manager.components.available_device_resources.device_tag_group_item_ui import (
Ui_DeviceTagGroup,
)
DEVICE_HASH_ROLE = 101
def _warning_string(spec: HashableDevice):
names_str = "\n ".join(spec.names)
msg = (
f"Device defined with multiple names! Please check:\n {names_str}\n"
if len(spec.names) > 1
else ""
)
source_str = "\n ".join(spec.source_files)
source_warning = (
f"Device found in multiple source files! Please check:\n {source_str}"
if len(spec.source_files) > 1
else ""
)
return f"{msg}{source_warning}"
class _DeviceEntryWidget(QFrame):
_grid_size = QSize(120, 80)
def __init__(self, device_spec: HashableDevice, parent=None, **kwargs):
super().__init__(parent, **kwargs)
self._device_spec = device_spec
self.included: bool = False
self.setFrameShape(QFrame.Shape.StyledPanel)
self.setFrameShadow(QFrame.Shadow.Raised)
self._layout = QVBoxLayout()
self._layout.setContentsMargins(5, 5, 5, 5)
self.setLayout(self._layout)
self.setMinimumSize(self._grid_size)
self.setup_title_layout(device_spec)
self.check_and_display_warning()
self.setToolTip(device_spec.rich_text())
self.details = QLabel(f"Tags:\n{', '.join(device_spec.deviceTags)}")
self.details.setStyleSheet("QLabel { font-size: 8pt; }")
self.details.setWordWrap(True)
self._layout.addWidget(self.details)
def setup_title_layout(self, device_spec: HashableDevice):
self._title_layout = QHBoxLayout()
self._title_layout.setContentsMargins(0, 0, 0, 0)
self._title_container = QWidget(parent=self)
self._title_container.setLayout(self._title_layout)
self._warning_label = QLabel()
self._title_layout.addWidget(self._warning_label)
self.title = QLabel(device_spec.name)
self.title.setToolTip(device_spec.name)
self.title.setStyleSheet(self.title_style("#FF0000"))
self._title_layout.addWidget(self.title)
self._layout.addWidget(self._title_container)
def check_and_display_warning(self):
if len(self._device_spec.names) == 1 and len(self._device_spec.source_files) == 1:
self._warning_label.setText("")
self._warning_label.setToolTip("")
else:
self._warning_label.setPixmap(material_icon("warning", size=(12, 12), color="#FFAA00"))
self._warning_label.setToolTip(_warning_string(self._device_spec))
@property
def device_hash(self):
return hash(self._device_spec)
def title_style(self, color: str) -> str:
return f"QLabel {{ color: {color}; font-weight: bold; font-size: 10pt; }}"
def setTitle(self, text: str):
self.title.setText(text)
def set_included(self, included: bool):
self.included = included
self.title.setStyleSheet(self.title_style("#00FF00" if included else "#FF0000"))
class _DeviceEntry(NamedTuple):
list_item: QListWidgetItem
widget: _DeviceEntryWidget
class DeviceTagGroup(QWidget, Ui_DeviceTagGroup):
def __init__(
self, parent=None, name: str = "TagGroupTitle", data: set[HashableDevice] = set(), **kwargs
):
super().__init__(parent=parent, **kwargs)
self.setupUi(self)
self.device_list.setGridSize(_DeviceEntryWidget._grid_size)
self.title.setText(name)
self._devices: dict[str, _DeviceEntry] = {}
for device in data:
self._add_item(device)
self.device_list.sortItems()
self._update_num_included()
self.add_to_composition_button.clicked.connect(self.test)
def _add_item(self, device: HashableDevice):
item = QListWidgetItem(self.device_list)
widget = _DeviceEntryWidget(device, self)
item.setSizeHint(QSize(widget.width(), widget.height()))
self.device_list.setItemWidget(item, widget)
self.device_list.addItem(item)
self._devices[device.name] = _DeviceEntry(item, widget)
def reset_devices_state(self):
for dev in self._devices.values():
dev.widget.set_included(False)
self._update_num_included()
def set_item_state(self, /, device_hash: int, included: bool):
for dev in self._devices.values():
if dev.widget.device_hash == device_hash:
dev.widget.set_included(included)
self._update_num_included()
def _update_num_included(self):
n_included = sum(int(dev.widget.included) for dev in self._devices.values())
if n_included == 0:
color = "#FF0000"
elif n_included == len(self._devices):
color = "#00FF00"
else:
color = "#FFAA00"
self.n_included.setText(f"{n_included} / {len(self._devices)}")
self.n_included.setStyleSheet(f"QLabel {{ color: {color}; }}")
def resizeEvent(self, event):
super().resizeEvent(event)
self.setMinimumHeight(self.sizeHint().height())
self.setMaximumHeight(self.sizeHint().height())
def get_selection(self) -> set[HashableDevice]:
selection = self.device_list.selectedItems()
widgets = (w.widget for _, w in self._devices.items() if w.list_item in selection)
return set(w._device_spec for w in widgets)
def test(self, *args):
print(self.get_selection())
def __repr__(self) -> str:
return f"{self.__class__.__name__}: {self.title.text()}"
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
widget = DeviceTagGroup(name="Tag group 1")
for item in [
HashableDevice(
**{
"name": f"test_device_{i}",
"deviceClass": "TestDeviceClass",
"readoutPriority": "baseline",
"enabled": True,
}
)
for i in range(5)
]:
widget._add_item(item)
widget._update_num_included()
widget.show()
sys.exit(app.exec())
@@ -1,135 +0,0 @@
import math
from functools import partial
from bec_qthemes import material_icon
from qtpy.QtCore import QMetaObject, QSize, Qt
from qtpy.QtWidgets import (
QAbstractItemView,
QFrame,
QHBoxLayout,
QLabel,
QListView,
QListWidget,
QSizePolicy,
QSpacerItem,
QToolButton,
QVBoxLayout,
)
class AutoHeightListWidget(QListWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setViewMode(QListView.ViewMode.IconMode)
self.setResizeMode(QListView.ResizeMode.Adjust)
self.setWrapping(True)
self.setUniformItemSizes(True)
self.setMovement(QListView.Movement.Static)
self.setAcceptDrops(False)
self.setDragEnabled(True)
self.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection)
self.setSpacing(5)
self.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
self.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
def resizeEvent(self, event):
super().resizeEvent(event)
self.setMinimumHeight(self._calcSize().height())
self.setMaximumHeight(self._calcSize().height())
def sizeHint(self):
return self._calcSize()
def minimumSizeHint(self):
return self._calcSize()
def _calcSize(self):
if self.count() == 0:
return super().sizeHint()
grid = self.gridSize()
if not grid.isValid():
grid = QSize(100, 100) # fallback
items_per_row = max(1, self.viewport().width() // grid.width())
rows = math.ceil(self.count() / items_per_row)
height = rows * grid.height() + 2 * self.frameWidth()
return QSize(self.viewport().width(), height)
class Ui_DeviceTagGroup(object):
def setupUi(self, DeviceTagGroup):
if not DeviceTagGroup.objectName():
DeviceTagGroup.setObjectName("DeviceTagGroup")
DeviceTagGroup.setMinimumWidth(150)
self.verticalLayout = QVBoxLayout(DeviceTagGroup)
self.verticalLayout.setObjectName("verticalLayout")
self.frame = QFrame(DeviceTagGroup)
self.frame.setObjectName("frame")
self.frame.setFrameShape(QFrame.Shape.StyledPanel)
self.frame.setFrameShadow(QFrame.Shadow.Raised)
self.verticalLayout_2 = QVBoxLayout(self.frame)
self.verticalLayout_2.setObjectName("verticalLayout_2")
self.horizontalLayout = QHBoxLayout()
self.horizontalLayout.setObjectName("horizontalLayout")
self.title = QLabel(self.frame)
self.title.setObjectName("title")
self.horizontalLayout.addWidget(self.title)
self.n_included = QLabel(self.frame, text="...")
self.n_included.setObjectName("n_included")
self.horizontalLayout.addWidget(self.n_included)
self.horizontalSpacer = QSpacerItem(
40, 20, QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Minimum
)
self.horizontalLayout.addItem(self.horizontalSpacer)
self.delete_tag_button = QToolButton(self.frame)
self.delete_tag_button.setObjectName("delete_tag_button")
self.horizontalLayout.addWidget(self.delete_tag_button)
self.remove_from_composition_button = QToolButton(self.frame)
self.remove_from_composition_button.setObjectName("remove_from_composition_button")
self.horizontalLayout.addWidget(self.remove_from_composition_button)
self.add_to_composition_button = QToolButton(self.frame)
self.add_to_composition_button.setObjectName("add_to_composition_button")
self.horizontalLayout.addWidget(self.add_to_composition_button)
self.remove_all_button = QToolButton(self.frame)
self.remove_all_button.setObjectName("remove_all_from_composition_button")
self.horizontalLayout.addWidget(self.remove_all_button)
self.add_all_button = QToolButton(self.frame)
self.add_all_button.setObjectName("add_all_to_composition_button")
self.horizontalLayout.addWidget(self.add_all_button)
self.verticalLayout_2.addLayout(self.horizontalLayout)
self.device_list = AutoHeightListWidget(self.frame)
self.device_list.setObjectName("device_list")
self.verticalLayout_2.addWidget(self.device_list)
self.verticalLayout.addWidget(self.frame)
self.set_icons()
QMetaObject.connectSlotsByName(DeviceTagGroup)
def set_icons(self):
icon = partial(material_icon, size=(15, 15), convert_to_pixmap=False)
self.delete_tag_button.setIcon(icon("delete"))
self.delete_tag_button.setToolTip("Delete tag group")
self.remove_from_composition_button.setIcon(icon("remove"))
self.remove_from_composition_button.setToolTip("Remove selected from composition")
self.add_to_composition_button.setIcon(icon("add"))
self.add_to_composition_button.setToolTip("Add selected to composition")
self.remove_all_button.setIcon(icon("chips"))
self.remove_all_button.setToolTip("Remove all with this tag from composition")
self.add_all_button.setIcon(icon("add_box"))
self.add_all_button.setToolTip("Add all with this tag to composition")
@@ -3,18 +3,16 @@
from __future__ import annotations
import copy
import time
import json
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from qtpy import QtCore, QtGui, QtWidgets
from thefuzz import fuzz
from bec_widgets.utils.bec_signal_proxy import BECSignalProxy
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors, get_theme_palette
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.control.device_manager.components.dm_ophyd_test import ValidationStatus
logger = bec_logger.logger
@@ -25,32 +23,34 @@ FUZZY_SEARCH_THRESHOLD = 80
class DictToolTipDelegate(QtWidgets.QStyledItemDelegate):
"""Delegate that shows all key-value pairs of a rows's data as a YAML-like tooltip."""
@staticmethod
def dict_to_str(d: dict) -> str:
"""Convert a dictionary to a formatted string."""
return json.dumps(d, indent=4)
def helpEvent(self, event, view, option, index):
"""Override to show tooltip when hovering."""
if event.type() != QtCore.QEvent.ToolTip:
return super().helpEvent(event, view, option, index)
model: DeviceFilterProxyModel = index.model()
model_index = model.mapToSource(index)
row_dict = model.sourceModel().get_row_data(model_index)
description = row_dict.get("description", "")
QtWidgets.QToolTip.showText(event.globalPos(), description, view)
row_dict = model.sourceModel().row_data(model_index)
row_dict.pop("description", None)
QtWidgets.QToolTip.showText(event.globalPos(), self.dict_to_str(row_dict), view)
return True
class CenterCheckBoxDelegate(DictToolTipDelegate):
"""Custom checkbox delegate to center checkboxes in table cells."""
def __init__(self, parent=None, colors=None):
def __init__(self, parent=None):
super().__init__(parent)
self._colors = colors if colors else get_accent_colors()
colors = get_accent_colors()
self._icon_checked = material_icon(
"check_box", size=QtCore.QSize(16, 16), color=self._colors.default, filled=True
"check_box", size=QtCore.QSize(16, 16), color=colors.default
)
self._icon_unchecked = material_icon(
"check_box_outline_blank",
size=QtCore.QSize(16, 16),
color=self._colors.default,
filled=True,
"check_box_outline_blank", size=QtCore.QSize(16, 16), color=colors.default
)
def apply_theme(self, theme: str | None = None):
@@ -81,51 +81,9 @@ class CenterCheckBoxDelegate(DictToolTipDelegate):
return model.setData(index, new_state, QtCore.Qt.CheckStateRole)
class DeviceValidatedDelegate(DictToolTipDelegate):
"""Custom delegate for displaying validated device configurations."""
def __init__(self, parent=None, colors=None):
super().__init__(parent)
self._colors = colors if colors else get_accent_colors()
self._icons = {
ValidationStatus.PENDING: material_icon(
icon_name="circle", size=(12, 12), color=self._colors.default, filled=True
),
ValidationStatus.VALID: material_icon(
icon_name="circle", size=(12, 12), color=self._colors.success, filled=True
),
ValidationStatus.FAILED: material_icon(
icon_name="circle", size=(12, 12), color=self._colors.emergency, filled=True
),
}
def apply_theme(self, theme: str | None = None):
colors = get_accent_colors()
for status, icon in self._icons.items():
icon.setColor(colors[status])
def paint(self, painter, option, index):
status = index.model().data(index, QtCore.Qt.DisplayRole)
if status is None:
return super().paint(painter, option, index)
pixmap = self._icons.get(status)
if pixmap:
rect = option.rect
pix_rect = pixmap.rect()
pix_rect.moveCenter(rect.center())
painter.drawPixmap(pix_rect.topLeft(), pixmap)
super().paint(painter, option, index)
class WrappingTextDelegate(DictToolTipDelegate):
"""Custom delegate for wrapping text in table cells."""
def __init__(self, table: BECTableView, parent=None):
super().__init__(parent)
self._table = table
def paint(self, painter, option, index):
text = index.model().data(index, QtCore.Qt.DisplayRole)
if not text:
@@ -139,14 +97,12 @@ class WrappingTextDelegate(DictToolTipDelegate):
def sizeHint(self, option, index):
text = str(index.model().data(index, QtCore.Qt.DisplayRole) or "")
column_width = self._table.columnWidth(index.column()) - 8 # -4 & 4
# if not text:
# return super().sizeHint(option, index)
# Avoid pathological heights for too-narrow columns
min_width = option.fontMetrics.averageCharWidth() * 4
if column_width < min_width:
fm = QtGui.QFontMetrics(option.font)
elided = fm.elidedText(text, QtCore.Qt.ElideRight, column_width)
return QtCore.QSize(column_width, fm.height() + 4)
# Use the actual column width
table = index.model().parent() # or store reference to QTableView
column_width = table.columnWidth(index.column()) # - 8
doc = QtGui.QTextDocument()
doc.setDefaultFont(option.font)
@@ -154,25 +110,8 @@ class WrappingTextDelegate(DictToolTipDelegate):
doc.setPlainText(text)
layout_height = doc.documentLayout().documentSize().height()
return QtCore.QSize(column_width, int(layout_height) + 4)
# def sizeHint(self, option, index):
# text = str(index.model().data(index, QtCore.Qt.DisplayRole) or "")
# # if not text:
# # return super().sizeHint(option, index)
# # Use the actual column width
# table = index.model().parent() # or store reference to QTableView
# column_width = table.columnWidth(index.column()) # - 8
# doc = QtGui.QTextDocument()
# doc.setDefaultFont(option.font)
# doc.setTextWidth(column_width)
# doc.setPlainText(text)
# layout_height = doc.documentLayout().documentSize().height()
# height = int(layout_height) + 4 # Needs some extra padding, otherwise it gets cut off
# return QtCore.QSize(column_width, height)
height = int(layout_height) + 4 # Needs some extra padding, otherwise it gets cut off
return QtCore.QSize(column_width, height)
class DeviceTableModel(QtCore.QAbstractTableModel):
@@ -182,22 +121,17 @@ class DeviceTableModel(QtCore.QAbstractTableModel):
Sort logic is implemented directly on the data of the table view.
"""
device_configs_added = QtCore.Signal(dict) # Dict[str, dict] of configs that were added
devices_removed = QtCore.Signal(list) # List of strings with device names that were removed
def __init__(self, parent=None):
def __init__(self, device_config: list[dict] | None = None, parent=None):
super().__init__(parent)
self._device_config: dict[str, dict] = {}
self._list_items: list[dict] = []
self._validation_status: dict[str, ValidationStatus] = {}
self._device_config = device_config or []
self.headers = [
"",
"name",
"deviceClass",
"readoutPriority",
"deviceTags",
"enabled",
"readOnly",
"deviceTags",
"description",
]
self._checkable_columns_enabled = {"enabled": True, "readOnly": True}
@@ -206,7 +140,7 @@ class DeviceTableModel(QtCore.QAbstractTableModel):
###############################################
def rowCount(self, parent=QtCore.QModelIndex()) -> int:
return len(self._list_items)
return len(self._device_config)
def columnCount(self, parent=QtCore.QModelIndex()) -> int:
return len(self.headers)
@@ -216,32 +150,25 @@ class DeviceTableModel(QtCore.QAbstractTableModel):
return self.headers[section]
return None
def get_row_data(self, index: QtCore.QModelIndex) -> dict:
def row_data(self, index: QtCore.QModelIndex) -> dict:
"""Return the row data for the given index."""
if not index.isValid():
return {}
return copy.deepcopy(self._list_items[index.row()])
return copy.deepcopy(self._device_config[index.row()])
def data(self, index, role=QtCore.Qt.DisplayRole):
"""Return data for the given index and role."""
if not index.isValid():
return None
row, col = index.row(), index.column()
if col == 0 and role == QtCore.Qt.DisplayRole: # QtCore.Qt.DisplayRole:
dev_name = self._list_items[row].get("name", "")
return self._validation_status.get(dev_name, ValidationStatus.PENDING)
key = self.headers[col]
value = self._list_items[row].get(key)
value = self._device_config[row].get(key)
if role == QtCore.Qt.DisplayRole:
if key in ("enabled", "readOnly"):
return bool(value)
if key == "deviceTags":
return ", ".join(str(tag) for tag in value) if value else ""
if key == "deviceClass":
return str(value).split(".")[-1]
return str(value) if value is not None else ""
if role == QtCore.Qt.CheckStateRole and key in ("enabled", "readOnly"):
return QtCore.Qt.Checked if value else QtCore.Qt.Unchecked
@@ -288,7 +215,7 @@ class DeviceTableModel(QtCore.QAbstractTableModel):
if key in ("enabled", "readOnly") and role == QtCore.Qt.CheckStateRole:
if not self._checkable_columns_enabled.get(key, True):
return False # ignore changes if column is disabled
self._list_items[row][key] = value == QtCore.Qt.Checked
self._device_config[row][key] = value == QtCore.Qt.Checked
self.dataChanged.emit(index, index, [QtCore.Qt.CheckStateRole])
return True
return False
@@ -297,115 +224,87 @@ class DeviceTableModel(QtCore.QAbstractTableModel):
############ Public methods ########
####################################
def get_device_config(self) -> dict[str, dict]:
"""Method to get the device configuration."""
def get_device_config(self) -> list[dict]:
"""Return the current device config (with checkbox updates applied)."""
return self._device_config
def add_device_configs(self, device_configs: dict[str, dict]):
def set_checkbox_enabled(self, column_name: str, enabled: bool):
"""
Add devices to the model.
Enable/Disable the checkbox column.
Args:
device_configs (dict[str, dict]): A dictionary of device configurations to add.
column_name (str): The name of the column to modify.
enabled (bool): Whether the checkbox should be enabled or disabled.
"""
already_in_list = []
for k, cfg in device_configs.items():
if k in self._device_config:
logger.warning(f"Device {k} already exists in the model.")
already_in_list.append(k)
continue
self._device_config[k] = cfg
new_list_cfg = copy.deepcopy(cfg)
new_list_cfg["name"] = k
row = len(self._list_items)
self.beginInsertRows(QtCore.QModelIndex(), row, row)
self._list_items.append(new_list_cfg)
self.endInsertRows()
for k in already_in_list:
device_configs.pop(k)
self.device_configs_added.emit(device_configs)
if column_name in self._checkable_columns_enabled:
self._checkable_columns_enabled[column_name] = enabled
col = self.headers.index(column_name)
top_left = self.index(0, col)
bottom_right = self.index(self.rowCount() - 1, col)
self.dataChanged.emit(
top_left, bottom_right, [QtCore.Qt.CheckStateRole, QtCore.Qt.DisplayRole]
)
def set_device_config(self, device_configs: dict[str, dict]):
def set_device_config(self, device_config: list[dict]):
"""
Replace the device config.
Args:
device_config (dict[str, dict]): The new device config to set.
device_config (list[dict]): The new device config to set.
"""
diff_names = set(device_configs.keys()) - set(self._device_config.keys())
self.beginResetModel()
self._device_config.clear()
self._list_items.clear()
for k, cfg in device_configs.items():
self._device_config[k] = cfg
new_list_cfg = copy.deepcopy(cfg)
new_list_cfg["name"] = k
self._list_items.append(new_list_cfg)
self._device_config = list(device_config)
self.endResetModel()
self.devices_removed.emit(diff_names)
self.device_configs_added.emit(device_configs)
def remove_device_configs(self, device_configs: dict[str, dict]):
@SafeSlot(dict)
def add_device(self, device: dict):
"""
Remove devices from the model.
Add an extra device to the device config at the bottom.
Args:
device_configs (dict[str, dict]): A dictionary of device configurations to remove.
device (dict): The device configuration to add.
"""
removed = []
for k in device_configs.keys():
if k not in self._device_config:
logger.warning(f"Device {k} does not exist in the model.")
continue
new_cfg = self._device_config.pop(k)
new_cfg["name"] = k
row = self._list_items.index(new_cfg)
row = len(self._device_config)
self.beginInsertRows(QtCore.QModelIndex(), row, row)
self._device_config.append(device)
self.endInsertRows()
@SafeSlot(int)
def remove_device_by_row(self, row: int):
"""
Remove one device row by index. This maps to the row to the source of the data model
Args:
row (int): The index of the device row to remove.
"""
if 0 <= row < len(self._device_config):
self.beginRemoveRows(QtCore.QModelIndex(), row, row)
self._list_items.pop(row)
self._device_config.pop(row)
self.endRemoveRows()
removed.append(k)
self.devices_removed.emit(removed)
def clear_table(self):
@SafeSlot(list)
def remove_devices_by_rows(self, rows: list[int]):
"""
Clear the table.
"""
device_names = list(self._device_config.keys())
self.beginResetModel()
self._device_config.clear()
self._list_items.clear()
self.endResetModel()
self.devices_removed.emit(device_names)
def update_validation_status(self, device_name: str, status: int | ValidationStatus):
"""
Handle device status changes.
Remove multiple device rows by their indices.
Args:
device_name (str): The name of the device.
status (int): The new status of the device.
rows (list[int]): The indices of the device rows to remove.
"""
if isinstance(status, int):
status = ValidationStatus(status)
if device_name not in self._device_config:
logger.warning(
f"Device {device_name} not found in device_config dict {self._device_config}"
)
return
self._validation_status[device_name] = status
row = None
for ii, item in enumerate(self._list_items):
if item["name"] == device_name:
row = ii
for row in sorted(rows, reverse=True):
self.remove_device_by_row(row)
@SafeSlot(str)
def remove_device_by_name(self, name: str):
"""
Remove one device row by name.
Args:
name (str): The name of the device to remove.
"""
for row, device in enumerate(self._device_config):
if device.get("name") == name:
self.remove_device_by_row(row)
break
if row is None:
logger.warning(
f"Device {device_name} not found in device_status dict {self._validation_status}"
)
return
# Emit dataChanged for column 0 (status column)
index = self.index(row, 0)
self.dataChanged.emit(index, index, [QtCore.Qt.DisplayRole])
class BECTableView(QtWidgets.QTableView):
@@ -425,7 +324,12 @@ class BECTableView(QtWidgets.QTableView):
if not proxy_indexes:
return
source_rows = self._get_source_rows(proxy_indexes)
# Get unique rows (proxy indices) in reverse order so removal indexes stay valid
proxy_rows = sorted({idx.row() for idx in proxy_indexes}, reverse=True)
# Map to source model rows
source_rows = [
self.model().mapToSource(self.model().index(row, 0)).row() for row in proxy_rows
]
model: DeviceTableModel = self.model().sourceModel() # access underlying model
# Delegate confirmation and removal to helper
@@ -433,28 +337,14 @@ class BECTableView(QtWidgets.QTableView):
if not removed:
return
def _get_source_rows(self, proxy_indexes: list[QtWidgets.QModelIndex]) -> list[int]:
"""
Map proxy model indices to source model row indices.
Args:
proxy_indexes (list[QModelIndex]): List of proxy model indices.
Returns:
list[int]: List of source model row indices.
"""
proxy_rows = sorted({idx for idx in proxy_indexes}, reverse=True)
source_rows = [self.model().mapToSource(idx).row() for idx in proxy_rows]
return list(set(source_rows))
def _confirm_and_remove_rows(self, model: DeviceTableModel, source_rows: list[int]) -> bool:
"""
Prompt the user to confirm removal of rows and remove them from the model if accepted.
Returns True if rows were removed, False otherwise.
"""
configs = [model._list_items[r] for r in sorted(source_rows)]
names = [cfg.get("name", "<unknown>") for cfg in configs]
cfg = model.get_device_config()
names = [str(cfg[r].get("name", "<unknown>")) for r in sorted(source_rows)]
msg = QtWidgets.QMessageBox(self)
msg.setIcon(QtWidgets.QMessageBox.Warning)
@@ -469,8 +359,8 @@ class BECTableView(QtWidgets.QTableView):
res = msg.exec_()
if res == QtWidgets.QMessageBox.Ok:
configs_to_be_removed = {model._device_config[name] for name in names}
model.remove_device_configs(configs_to_be_removed)
model.remove_devices_by_rows(source_rows)
# TODO add signal for removed devices
return True
return False
@@ -482,7 +372,7 @@ class DeviceFilterProxyModel(QtCore.QSortFilterProxyModel):
self._hidden_rows = set()
self._filter_text = ""
self._enable_fuzzy = True
self._filter_columns = [1, 2] # name and deviceClass for search
self._filter_columns = [0, 1] # name and deviceClass for search
def hide_rows(self, row_indices: list[int]):
"""
@@ -546,12 +436,9 @@ class DeviceFilterProxyModel(QtCore.QSortFilterProxyModel):
class DeviceTableView(BECWidget, QtWidgets.QWidget):
"""Device Table View for the device manager."""
selected_device = QtCore.Signal(dict) # Selected device configuration dict[str,dict]
device_configs_added = QtCore.Signal(dict) # Dict[str, dict] of configs that were added
devices_removed = QtCore.Signal(list) # List of strings with device names that were removed
RPC = False
PLUGIN = False
devices_removed = QtCore.Signal(list)
def __init__(self, parent=None, client=None):
super().__init__(client=client, parent=parent, theme_update=True)
@@ -568,10 +455,6 @@ class DeviceTableView(BECWidget, QtWidgets.QWidget):
self.layout.addLayout(self.search_controls)
self.layout.addWidget(self.table)
# Connect signals
self._model.devices_removed.connect(self.devices_removed.emit)
self._model.device_configs_added.connect(self.device_configs_added.emit)
def _setup_search(self):
"""Create components related to the search functionality"""
@@ -612,199 +495,137 @@ class DeviceTableView(BECWidget, QtWidgets.QWidget):
"""Setup the table view."""
# Model + Proxy
self.table = BECTableView(self)
self._model = DeviceTableModel(parent=self.table)
self.model = DeviceTableModel(parent=self.table)
self.proxy = DeviceFilterProxyModel(parent=self.table)
self.proxy.setSourceModel(self._model)
self.proxy.setSourceModel(self.model)
self.table.setModel(self.proxy)
self.table.setSortingEnabled(True)
# Delegates
colors = get_accent_colors()
self.checkbox_delegate = CenterCheckBoxDelegate(self.table, colors=colors)
self.checkbox_delegate = CenterCheckBoxDelegate(self.table)
self.wrap_delegate = WrappingTextDelegate(self.table)
self.tool_tip_delegate = DictToolTipDelegate(self.table)
self.validated_delegate = DeviceValidatedDelegate(self.table, colors=colors)
self.table.setItemDelegateForColumn(0, self.validated_delegate) # ValidationStatus
self.table.setItemDelegateForColumn(1, self.tool_tip_delegate) # name
self.table.setItemDelegateForColumn(2, self.tool_tip_delegate) # deviceClass
self.table.setItemDelegateForColumn(3, self.tool_tip_delegate) # readoutPriority
self.table.setItemDelegateForColumn(4, self.wrap_delegate) # deviceTags
self.table.setItemDelegateForColumn(5, self.checkbox_delegate) # enabled
self.table.setItemDelegateForColumn(6, self.checkbox_delegate) # readOnly
self.table.setItemDelegateForColumn(0, self.tool_tip_delegate) # name
self.table.setItemDelegateForColumn(1, self.tool_tip_delegate) # deviceClass
self.table.setItemDelegateForColumn(2, self.tool_tip_delegate) # readoutPriority
self.table.setItemDelegateForColumn(3, self.checkbox_delegate) # enabled
self.table.setItemDelegateForColumn(4, self.checkbox_delegate) # readOnly
self.table.setItemDelegateForColumn(5, self.wrap_delegate) # deviceTags
self.table.setItemDelegateForColumn(6, self.wrap_delegate) # description
# Column resize policies
# TODO maybe we need here a flexible header options as deviceClass
# may get quite long for beamlines plugin repos
header = self.table.horizontalHeader()
header.setSectionResizeMode(0, QtWidgets.QHeaderView.Fixed) # ValidationStatus
header.setSectionResizeMode(1, QtWidgets.QHeaderView.ResizeToContents) # name
header.setSectionResizeMode(2, QtWidgets.QHeaderView.ResizeToContents) # deviceClass
header.setSectionResizeMode(3, QtWidgets.QHeaderView.ResizeToContents) # readoutPriority
header.setSectionResizeMode(4, QtWidgets.QHeaderView.Stretch) # deviceTags
header.setSectionResizeMode(5, QtWidgets.QHeaderView.Fixed) # enabled
header.setSectionResizeMode(6, QtWidgets.QHeaderView.Fixed) # readOnly
self.table.setColumnWidth(0, 25)
self.table.setColumnWidth(5, 70)
self.table.setColumnWidth(6, 70)
header.setSectionResizeMode(0, QtWidgets.QHeaderView.ResizeToContents) # name
header.setSectionResizeMode(1, QtWidgets.QHeaderView.ResizeToContents) # deviceClass
header.setSectionResizeMode(2, QtWidgets.QHeaderView.ResizeToContents) # readoutPriority
header.setSectionResizeMode(3, QtWidgets.QHeaderView.Fixed) # enabled
header.setSectionResizeMode(4, QtWidgets.QHeaderView.Fixed) # readOnly
# TODO maybe better stretch...
header.setSectionResizeMode(5, QtWidgets.QHeaderView.ResizeToContents) # deviceTags
header.setSectionResizeMode(6, QtWidgets.QHeaderView.Stretch) # description
self.table.setColumnWidth(3, 82)
self.table.setColumnWidth(4, 82)
# Ensure column widths stay fixed
header.setMinimumSectionSize(25)
header.setMinimumSectionSize(70)
header.setDefaultSectionSize(90)
# Enable resizing of column
self._geometry_resize_proxy = BECSignalProxy(
header.geometriesChanged, rateLimit=10, slot=self._on_table_resized
)
header.sectionResized.connect(self.on_table_resized)
# Selection behavior
self.table.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectRows)
self.table.setSelectionMode(QtWidgets.QAbstractItemView.ExtendedSelection)
# Connect to selection model to get selection changes
self.table.selectionModel().selectionChanged.connect(self._on_selection_changed)
self.table.horizontalHeader().setHighlightSections(False)
# QtCore.QTimer.singleShot(0, lambda: header.sectionResized.emit(0, 0, 0))
def get_device_config(self) -> dict[str, dict]:
def device_config(self) -> list[dict]:
"""Get the device config."""
return self._model.get_device_config()
return self.model.get_device_config()
def apply_theme(self, theme: str | None = None):
self.checkbox_delegate.apply_theme(theme)
self.validated_delegate.apply_theme(theme)
######################################
########### Slot API #################
######################################
@SafeSlot()
def _on_table_resized(self, *args):
@SafeSlot(int, int, int)
def on_table_resized(self, column, old_width, new_width):
"""Handle changes to the table column resizing."""
option = QtWidgets.QStyleOptionViewItem()
model = self.table.model()
for row in range(model.rowCount()):
index = model.index(row, 4)
height = self.wrap_delegate.sizeHint(option, index).height()
self.table.setRowHeight(row, height)
@SafeSlot(QtCore.QItemSelection, QtCore.QItemSelection)
def _on_selection_changed(
self, selected: QtCore.QItemSelection, deselected: QtCore.QItemSelection
) -> None:
"""
Handle selection changes in the device table.
Args:
selected (QtCore.QItemSelection): The selected items.
deselected (QtCore.QItemSelection): The deselected items.
"""
# TODO also hook up logic if a config update is propagated from somewhere!
# selected_indexes = selected.indexes()
selected_indexes = self.table.selectionModel().selectedIndexes()
if not selected_indexes:
if column != len(self.model.headers) - 1:
return
source_indexes = [self.proxy.mapToSource(idx) for idx in selected_indexes]
source_rows = {idx.row() for idx in source_indexes}
configs = [copy.deepcopy(self._model._list_items[r]) for r in sorted(source_rows)]
names = [cfg.pop("name") for cfg in configs]
selected_cfgs = {name: cfg for name, cfg in zip(names, configs)}
self.selected_device.emit(selected_cfgs)
for row in range(self.table.model().rowCount()):
index = self.table.model().index(row, column)
delegate = self.table.itemDelegate(index)
option = QtWidgets.QStyleOptionViewItem()
height = delegate.sizeHint(option, index).height()
self.table.setRowHeight(row, height)
######################################
##### Ext. Slot API #################
######################################
@SafeSlot(dict)
def set_device_config(self, device_configs: dict[str, dict]):
@SafeSlot(list)
def set_device_config(self, config: list[dict]):
"""
Set the device config.
Args:
config (dict[str,dict]): The device config to set.
config (list[dict]): The device config to set.
"""
self._model.set_device_config(device_configs)
self.model.set_device_config(config)
@SafeSlot()
def clear_device_configs(self):
"""Clear the device configs."""
self._model.clear_table()
def clear_device_config(self):
"""
Clear the device config.
"""
self.model.set_device_config([])
@SafeSlot(dict)
def add_device_configs(self, device_configs: dict[str, dict]):
def add_device(self, device: dict):
"""
Add devices to the config.
Add a device to the config.
Args:
device_configs (dict[str, dict]): The device configs to add.
device (dict): The device to add.
"""
self._model.add_device_configs(device_configs)
@SafeSlot(dict)
def remove_device_configs(self, device_configs: dict[str, dict]):
"""
Remove devices from the config.
Args:
device_configs (dict[str, dict]): The device configs to remove.
"""
self._model.remove_device_configs(device_configs)
self.model.add_device(device)
@SafeSlot(int)
@SafeSlot(str)
def remove_device(self, device_name: str):
def remove_device(self, dev: int | str):
"""
Remove a device from the config.
Remove the device from the config either by row id, or device name.
Args:
device_name (str): The name of the device to remove.
dev (int | str): The device to remove, either by row id or device name.
"""
cfg = self._model._device_config.get(device_name, None)
if cfg is None:
logger.warning(f"Device {device_name} not found in device_config dict")
if isinstance(dev, int):
# TODO test this properly, check with proxy index and source index
# Use the proxy model to map to the correct row
model_source_index = self.table.model().mapToSource(self.table.model().index(dev, 0))
self.model.remove_device_by_row(model_source_index.row())
return
if isinstance(dev, str):
self.model.remove_device_by_name(dev)
return
self._model.remove_device_configs({device_name: cfg})
@SafeSlot(str, int)
def update_device_validation(
self, device_name: str, validation_status: int | ValidationStatus
) -> None:
"""
Update the validation status of a device.
Args:
device_name (str): The name of the device.
validation_status (int | ValidationStatus): The new validation status.
"""
self._model.update_validation_status(device_name, validation_status)
if __name__ == "__main__":
import sys
import numpy as np
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
widget = QtWidgets.QWidget()
layout = QtWidgets.QVBoxLayout(widget)
layout.setContentsMargins(0, 0, 0, 0)
window = DeviceTableView()
layout.addWidget(window)
# QPushButton
button = QtWidgets.QPushButton("Test status_update")
layout.addWidget(button)
def _button_clicked():
names = list(window._model._device_config.keys())
for name in names:
window.update_device_validation(
name, ValidationStatus.VALID if np.random.rand() > 0.5 else ValidationStatus.FAILED
)
button.clicked.connect(_button_clicked)
# pylint: disable=protected-access
config = window.client.device_manager._get_redis_device_config()
names = [cfg.pop("name") for cfg in config]
config_dict = {name: cfg for name, cfg in zip(names, config)}
window.set_device_config(config_dict)
widget.show()
window.set_device_config(config)
window.show()
sys.exit(app.exec_())
@@ -1,83 +0,0 @@
"""Module with a config view for the device manager."""
from __future__ import annotations
import traceback
import yaml
from bec_lib.logger import bec_logger
from qtpy import QtCore, QtWidgets
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors, get_theme_palette
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.editors.monaco.monaco_widget import MonacoWidget
logger = bec_logger.logger
class DMConfigView(BECWidget, QtWidgets.QWidget):
def __init__(self, parent=None, client=None):
super().__init__(client=client, parent=parent, theme_update=True)
self.stacked_layout = QtWidgets.QStackedLayout()
self.stacked_layout.setContentsMargins(0, 0, 0, 0)
self.stacked_layout.setSpacing(0)
self.setLayout(self.stacked_layout)
# Monaco widget
self.monaco_editor = MonacoWidget()
self._customize_monaco()
self.stacked_layout.addWidget(self.monaco_editor)
self._overlay_widget = QtWidgets.QLabel(text="Select single device to show config")
self._customize_overlay()
self.stacked_layout.addWidget(self._overlay_widget)
self.stacked_layout.setCurrentWidget(self._overlay_widget)
def _customize_monaco(self):
self.monaco_editor.set_language("yaml")
self.monaco_editor.set_vim_mode_enabled(False)
self.monaco_editor.set_minimap_enabled(False)
# self.monaco_editor.setFixedHeight(600)
self.monaco_editor.set_readonly(True)
self.monaco_editor.editor.set_scroll_beyond_last_line_enabled(False)
self.monaco_editor.editor.set_line_numbers_mode("off")
def _customize_overlay(self):
self._overlay_widget.setAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self._overlay_widget.setAutoFillBackground(True)
self._overlay_widget.setSizePolicy(
QtWidgets.QSizePolicy.Policy.Expanding, QtWidgets.QSizePolicy.Policy.Expanding
)
@SafeSlot(dict)
def on_select_config(self, device: dict):
"""Handle selection of a device from the device table."""
if len(device) != 1:
text = ""
self.stacked_layout.setCurrentWidget(self._overlay_widget)
else:
try:
text = yaml.dump(device, default_flow_style=False)
self.stacked_layout.setCurrentWidget(self.monaco_editor)
except Exception:
content = traceback.format_exc()
logger.error(f"Error converting device to YAML:\n{content}")
text = ""
self.stacked_layout.setCurrentWidget(self._overlay_widget)
self.monaco_editor.set_readonly(False) # Enable editing
text = text.rstrip()
self.monaco_editor.set_text(text)
self.monaco_editor.set_readonly(True) # Disable editing again
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
config_view = DMConfigView()
config_view.show()
sys.exit(app.exec_())
@@ -1,128 +0,0 @@
"""Module to visualize the docstring of a device class."""
from __future__ import annotations
import inspect
import re
import traceback
from bec_lib.logger import bec_logger
from bec_lib.plugin_helper import get_plugin_class, plugin_package_name
from bec_lib.utils.rpc_utils import rgetattr
from qtpy import QtCore, QtWidgets
from bec_widgets.utils.error_popups import SafeSlot
logger = bec_logger.logger
try:
import ophyd
import ophyd_devices
READY_TO_VIEW = True
except ImportError:
logger.warning(f"Optional dependencies not available: {ImportError}")
ophyd_devices = None
ophyd = None
class DocstringView(QtWidgets.QTextEdit):
def __init__(self, parent: QtWidgets.QWidget | None = None):
super().__init__(parent)
self.setReadOnly(True)
self.setFocusPolicy(QtCore.Qt.NoFocus)
if not READY_TO_VIEW:
self._set_text("Ophyd or ophyd_devices not installed, cannot show docstrings.")
self.setEnabled(False)
return
def _format_docstring(self, doc: str | None) -> str:
if not doc:
return "<i>No docstring available.</i>"
# Escape HTML
doc = doc.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
# Remove leading/trailing blank lines from the entire docstring
lines = [line.rstrip() for line in doc.splitlines()]
while lines and lines[0].strip() == "":
lines.pop(0)
while lines and lines[-1].strip() == "":
lines.pop()
doc = "\n".join(lines)
# Improved regex: match section header + all following indented lines
section_regex = re.compile(
r"(?m)^(Parameters|Args|Returns|Examples|Attributes|Raises)\b(?:\n([ \t]+.*))*",
re.MULTILINE,
)
def strip_section(match: re.Match) -> str:
# Capture all lines in the match
block = match.group(0)
lines = block.splitlines()
# Remove leading/trailing empty lines within the section
lines = [line for line in lines if line.strip() != ""]
return "\n".join(lines)
doc = section_regex.sub(strip_section, doc)
# Highlight section titles
doc = re.sub(
r"(?m)^(Parameters|Args|Returns|Examples|Attributes|Raises)\b", r"<b>\1</b>", doc
)
# Convert indented blocks to <pre> and strip leading/trailing newlines
def pre_block(match: re.Match) -> str:
text = match.group(0).strip("\n")
return f"<pre>{text}</pre>"
doc = re.sub(r"(?m)(?:\n[ \t]+.*)+", pre_block, doc)
# Replace remaining newlines with <br> and collapse multiple <br>
doc = doc.replace("\n", "<br>")
doc = re.sub(r"(<br>)+", r"<br>", doc)
doc = doc.strip("<br>")
return f"<div style='font-family: sans-serif; font-size: 12pt;'>{doc}</div>"
def _set_text(self, text: str):
self.setReadOnly(False)
self.setMarkdown(text)
# self.setHtml(self._format_docstring(text))
self.setReadOnly(True)
@SafeSlot(dict)
def on_select_config(self, device: dict):
if len(device) != 1:
self._set_text("")
return
k = next(iter(device))
device_class = device[k].get("deviceClass", "")
self.set_device_class(device_class)
@SafeSlot(str)
def set_device_class(self, device_class_str: str) -> None:
docstring = ""
if not READY_TO_VIEW:
return
try:
module_cls = get_plugin_class(device_class_str, [ophyd_devices, ophyd])
docstring = inspect.getdoc(module_cls)
self._set_text(docstring or "No docstring available.")
except Exception:
content = traceback.format_exc()
logger.error(f"Error retrieving docstring for {device_class_str}: {content}")
self._set_text(f"Error retrieving docstring for {device_class_str}")
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
config_view = DocstringView()
config_view.set_device_class("ophyd_devices.sim.sim_camera.SimCamera")
config_view.show()
sys.exit(app.exec_())
@@ -1,414 +0,0 @@
"""Module to run a static tests for devices from a yaml config."""
from __future__ import annotations
import enum
import re
import traceback
from html import escape
from typing import TYPE_CHECKING
import bec_lib
from bec_lib.logger import bec_logger
from bec_qthemes import material_icon
from ophyd import status
from qtpy import QtCore, QtGui, QtWidgets
from bec_widgets.utils.bec_widget import BECWidget
from bec_widgets.utils.colors import get_accent_colors
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.widgets.editors.web_console.web_console import WebConsole
from bec_widgets.widgets.utility.spinner.spinner import SpinnerWidget
READY_TO_TEST = False
logger = bec_logger.logger
try:
import bec_server
import ophyd_devices
READY_TO_TEST = True
except ImportError:
logger.warning(f"Optional dependencies not available: {ImportError}")
ophyd_devices = None
bec_server = None
if TYPE_CHECKING: # pragma no cover
try:
from ophyd_devices.utils.static_device_test import StaticDeviceTest
except ImportError:
StaticDeviceTest = None
class ValidationStatus(int, enum.Enum):
"""Validation status for device configurations."""
PENDING = 0 # colors.default
VALID = 1 # colors.highlight
FAILED = 2 # colors.emergency
class DeviceValidationResult(QtCore.QObject):
"""Simple object to inject validation signals into QRunnable."""
# Device validation signal, device_name, ValidationStatus as int, error message or ''
device_validated = QtCore.Signal(str, bool, str)
class DeviceValidationRunnable(QtCore.QRunnable):
"""Runnable for validating a device configuration."""
def __init__(
self,
device_name: str,
config: dict,
static_device_test: StaticDeviceTest | None,
connect: bool = False,
):
"""
Initialize the device validation runnable.
Args:
device_name (str): The name of the device to validate.
config (dict): The configuration dictionary for the device.
static_device_test (StaticDeviceTest): The static device test instance.
connect (bool, optional): Whether to connect to the device. Defaults to False.
"""
super().__init__()
self.device_name = device_name
self.config = config
self._connect = connect
self._static_device_test = static_device_test
self.signals = DeviceValidationResult()
def run(self):
"""Run method for device validation."""
if self._static_device_test is None:
logger.error(
f"Ophyd devices or bec_server not available, cannot run validation for device {self.device_name}."
)
return
try:
self._static_device_test.config = {self.device_name: self.config}
results = self._static_device_test.run_with_list_output(connect=self._connect)
success = results[0].success
msg = results[0].message
self.signals.device_validated.emit(self.device_name, success, msg)
except Exception:
content = traceback.format_exc()
logger.error(f"Validation failed for device {self.device_name}. Exception: {content}")
self.signals.device_validated.emit(self.device_name, False, content)
class ValidationListItem(QtWidgets.QWidget):
"""Custom list item widget showing device name and validation status."""
def __init__(self, device_name: str, device_config: dict, parent=None):
"""
Initialize the validation list item.
Args:
device_name (str): The name of the device.
device_config (dict): The configuration of the device.
validation_colors (dict[ValidationStatus, QtGui.QColor]): The colors for each validation status.
parent (QtWidgets.QWidget, optional): The parent widget.
"""
super().__init__(parent)
self.main_layout = QtWidgets.QHBoxLayout(self)
self.main_layout.setContentsMargins(2, 2, 2, 2)
self.main_layout.setSpacing(4)
self.device_name = device_name
self.device_config = device_config
self.validation_msg = "Validation in progress..."
self._setup_ui()
def _setup_ui(self):
"""Setup the UI for the list item."""
label = QtWidgets.QLabel(self.device_name)
self.main_layout.addWidget(label)
self.main_layout.addStretch()
self._spinner = SpinnerWidget(parent=self)
self._spinner.speed = 80
self._spinner.setFixedSize(24, 24)
self.main_layout.addWidget(self._spinner)
self._base_style = "font-weight: bold;"
self.setStyleSheet(self._base_style)
self._start_spinner()
def _start_spinner(self):
"""Start the spinner animation."""
self._spinner.start()
QtWidgets.QApplication.processEvents()
def _stop_spinner(self):
"""Stop the spinner animation."""
self._spinner.stop()
self._spinner.setVisible(False)
@SafeSlot()
def on_validation_restart(self):
"""Handle validation restart."""
self.validation_msg = ""
self._start_spinner()
self.setStyleSheet("") # Check if this works as expected
@SafeSlot(str)
def on_validation_failed(self, error_msg: str):
"""Handle validation failure."""
self.validation_msg = error_msg
colors = get_accent_colors()
self._stop_spinner()
self.main_layout.removeWidget(self._spinner)
self._spinner.deleteLater()
label = QtWidgets.QLabel("")
icon = material_icon("error", color=colors.emergency, size=(24, 24))
label.setPixmap(icon)
self.main_layout.addWidget(label)
class DMOphydTest(BECWidget, QtWidgets.QWidget):
"""Widget to test device configurations using ophyd devices."""
# Signal to emit the validation status of a device
device_validated = QtCore.Signal(str, int)
def __init__(self, parent=None, client=None):
super().__init__(parent=parent, client=client)
if not READY_TO_TEST:
self.setDisabled(True)
self.static_device_test = None
else:
from ophyd_devices.utils.static_device_test import StaticDeviceTest
self.static_device_test = StaticDeviceTest(config_dict={})
self._device_list_items: dict[str, QtWidgets.QListWidgetItem] = {}
self._thread_pool = QtCore.QThreadPool.globalInstance()
self._main_layout = QtWidgets.QVBoxLayout(self)
self._main_layout.setContentsMargins(0, 0, 0, 0)
self._main_layout.setSpacing(4)
# We add a splitter between the list and the text box
self.splitter = QtWidgets.QSplitter(QtCore.Qt.Orientation.Vertical)
self._main_layout.addWidget(self.splitter)
self._setup_list_ui()
self._setup_textbox_ui()
def _setup_list_ui(self):
"""Setup the list UI."""
self._list_widget = QtWidgets.QListWidget(self)
self._list_widget.setSelectionMode(QtWidgets.QAbstractItemView.SingleSelection)
self.splitter.addWidget(self._list_widget)
# Connect signals
self._list_widget.currentItemChanged.connect(self._on_current_item_changed)
def _setup_textbox_ui(self):
"""Setup the text box UI."""
self._text_box = QtWidgets.QTextEdit(self)
self._text_box.setReadOnly(True)
self._text_box.setFocusPolicy(QtCore.Qt.NoFocus)
self.splitter.addWidget(self._text_box)
@SafeSlot(dict)
def add_device_configs(self, device_configs: dict[str, dict]) -> None:
"""Receive an update with device configs.
Args:
device_configs (dict[str, dict]): The updated device configurations.
"""
for device_name, device_config in device_configs.items():
if device_name in self._device_list_items:
logger.error(f"Device {device_name} is already in the list.")
return
item = QtWidgets.QListWidgetItem(self._list_widget)
widget = ValidationListItem(device_name=device_name, device_config=device_config)
# wrap it in a QListWidgetItem
item.setSizeHint(widget.sizeHint())
self._list_widget.addItem(item)
self._list_widget.setItemWidget(item, widget)
self._device_list_items[device_name] = item
self._run_device_validation(widget)
@SafeSlot(dict)
def remove_device_configs(self, device_configs: dict[str, dict]) -> None:
"""Remove device configs from the list.
Args:
device_name (str): The name of the device to remove.
"""
for device_name in device_configs.keys():
if device_name not in self._device_list_items:
logger.warning(f"Device {device_name} not found in list.")
return
self._remove_list_item(device_name)
def _remove_list_item(self, device_name: str):
"""Remove a device from the list."""
# Get the list item
item = self._device_list_items.pop(device_name)
# Retrieve the custom widget attached to the item
widget = self._list_widget.itemWidget(item)
if widget is not None:
widget.deleteLater() # clean up custom widget
# Remove the item from the QListWidget
row = self._list_widget.row(item)
self._list_widget.takeItem(row)
def _run_device_validation(self, widget: ValidationListItem):
"""
Run the device validation in a separate thread.
Args:
widget (ValidationListItem): The widget to validate.
"""
if not READY_TO_TEST:
logger.error("Ophyd devices or bec_server not available, cannot run validation.")
return
if (
widget.device_name in self.client.device_manager.devices
): # TODO and config has to be exact the same..
self._on_device_validated(
widget.device_name,
ValidationStatus.VALID,
f"Device {widget.device_name} is already in active config",
)
return
runnable = DeviceValidationRunnable(
device_name=widget.device_name,
config=widget.device_config,
static_device_test=self.static_device_test,
connect=False,
)
runnable.signals.device_validated.connect(self._on_device_validated)
self._thread_pool.start(runnable)
@SafeSlot(str, bool, str)
def _on_device_validated(self, device_name: str, success: bool, message: str):
"""Handle the device validation result.
Args:
device_name (str): The name of the device.
success (bool): Whether the validation was successful.
message (str): The validation message.
"""
logger.info(f"Device {device_name} validation result: {success}, message: {message}")
item = self._device_list_items.get(device_name, None)
if not item:
logger.error(f"Device {device_name} not found in the list.")
return
if success:
self._remove_list_item(device_name=device_name)
self.device_validated.emit(device_name, ValidationStatus.VALID.value)
else:
widget: ValidationListItem = self._list_widget.itemWidget(item)
widget.on_validation_failed(message)
self.device_validated.emit(device_name, ValidationStatus.FAILED.value)
def _on_current_item_changed(
self, current: QtWidgets.QListWidgetItem, previous: QtWidgets.QListWidgetItem
):
"""Handle the current item change in the list widget.
Args:
current (QListWidgetItem): The currently selected item.
previous (QListWidgetItem): The previously selected item.
"""
widget: ValidationListItem = self._list_widget.itemWidget(current)
if widget:
try:
formatted_html = self._format_validation_message(widget.validation_msg)
self._text_box.setHtml(formatted_html)
except Exception as e:
logger.error(f"Error formatting validation message: {e}")
self._text_box.setPlainText(widget.validation_msg)
def _format_validation_message(self, raw_msg: str) -> str:
"""Simple HTML formatting for validation messages, wrapping text naturally."""
if not raw_msg.strip():
return "<i>Validation in progress...</i>"
if raw_msg == "Validation in progress...":
return "<i>Validation in progress...</i>"
raw_msg = escape(raw_msg)
# Split into lines
lines = raw_msg.splitlines()
summary = lines[0] if lines else "Validation Result"
rest = "\n".join(lines[1:]).strip()
# Split traceback / final ERROR
tb_match = re.search(r"(Traceback.*|ERROR:.*)$", rest, re.DOTALL | re.MULTILINE)
if tb_match:
main_text = rest[: tb_match.start()].strip()
error_detail = tb_match.group().strip()
else:
main_text = rest
error_detail = ""
# Highlight field names in orange (simple regex for word: Field)
main_text_html = re.sub(
r"(\b\w+\b)(?=: Field required)",
r'<span style="color:#FF8C00; font-weight:bold;">\1</span>',
main_text,
)
# Wrap in div for monospace, allowing wrapping
main_text_html = (
f'<div style="white-space: pre-wrap;">{main_text_html}</div>' if main_text_html else ""
)
# Traceback / error in red
error_html = (
f'<div style="white-space: pre-wrap; color:#A00000;">{error_detail}</div>'
if error_detail
else ""
)
# Summary at top, dark red
html = (
f'<div style="font-family: monospace; font-size:13px; white-space: pre-wrap;">'
f'<div style="font-weight:bold; color:#8B0000; margin-bottom:4px;">{summary}</div>'
f"{main_text_html}"
f"{error_html}"
f"</div>"
)
return html
@SafeSlot()
def clear_list(self):
"""Clear the device list."""
self._thread_pool.clear()
if self._thread_pool.waitForDone(2000) is False: # Wait for threads to finish
logger.error("Failed to wait for threads to finish. Removing items from the list.")
self._device_list_items.clear()
self._list_widget.clear()
def remove_device(self, device_name: str):
"""Remove a device from the list."""
item = self._device_list_items.pop(device_name, None)
if item:
self._list_widget.removeItemWidget(item)
if __name__ == "__main__":
import sys
from bec_lib.bec_yaml_loader import yaml_load
# pylint: disable=ungrouped-imports
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
device_manager_ophyd_test = DMOphydTest()
config_path = "/Users/appel_c/work_psi_awi/bec_workspace/csaxs_bec/csaxs_bec/device_configs/endstation.yaml"
cfg = yaml_load(config_path)
cfg.update({"device_will_fail": {"name": "device_will_fail", "some_param": 1}})
device_manager_ophyd_test.add_device_configs(cfg)
device_manager_ophyd_test.show()
device_manager_ophyd_test.setWindowTitle("Device Manager Ophyd Test")
device_manager_ophyd_test.resize(800, 600)
sys.exit(app.exec_())
@@ -1,78 +0,0 @@
from copy import copy
import pytest
from bec_widgets.widgets.control.device_manager.components.available_device_resources.device_resource_backend import (
HashableDevice,
_HashableDeviceSet,
)
TEST_DEVICE_DICT = {
"name": "test_device",
"deviceClass": "TestDeviceClass",
"readoutPriority": "baseline",
"enabled": True,
}
def _test_device_dict(**kwargs):
new = copy(TEST_DEVICE_DICT)
new.update(kwargs)
return new
@pytest.mark.parametrize(
"kwargs_1, kwargs_2, kwargs_3, kwargs_4, n",
[
({}, {}, {}, {}, 1),
({}, {}, {}, {"deviceConfig": {"a": 1}}, 1),
({}, {}, {}, {"name": "test_device_2"}, 2),
({}, {}, {"name": "test_device_2"}, {"deviceClass": "OtherDeviceClass"}, 3),
],
)
def test_hashable_device_set_merges_equal(kwargs_1, kwargs_2, kwargs_3, kwargs_4, n):
item_1 = HashableDevice(**_test_device_dict(**kwargs_1))
item_2 = HashableDevice(**_test_device_dict(**kwargs_2))
item_3 = HashableDevice(**_test_device_dict(**kwargs_3))
item_4 = HashableDevice(**_test_device_dict(**kwargs_4))
test_set = _HashableDeviceSet((item_1, item_2, item_3, item_4))
assert len(test_set) == n
def test_hashable_device_set_or_adds_sources():
item_1 = HashableDevice(**_test_device_dict(), source_files={"a", "b"})
item_2 = HashableDevice(**_test_device_dict(), source_files={"c", "d"})
set_1 = _HashableDeviceSet((item_1,))
set_2 = _HashableDeviceSet((item_2,))
combined = set_1 | set_2
assert len(combined) == 1
assert combined.pop().source_files == {"a", "b", "c", "d"}
def test_hashable_device_set_or_adds_tags():
item_1 = HashableDevice(
**_test_device_dict(deviceTags={"tag1"}, deviceConfig={"param": "value"}),
source_files={"a", "b"},
)
item_2 = HashableDevice(
**_test_device_dict(deviceTags={"tag2"}, deviceConfig={"param": "value"}),
source_files={"c", "d"},
)
item_3 = HashableDevice(
**_test_device_dict(deviceTags={"tag3"}, deviceConfig={"param": "other_value"}),
source_files={"q"},
)
set_1 = _HashableDeviceSet((item_1,))
set_2 = _HashableDeviceSet((item_2,))
set_3 = _HashableDeviceSet((item_3,))
combined = sorted(set_1 | set_2 | set_3, key=lambda hd: hd.deviceConfig["param"])
assert len(combined) == 2
assert combined[0].source_files == {"q"}
assert combined[0].deviceTags == {"tag3"}
assert combined[1].source_files == {"a", "b", "c", "d"}
assert combined[1].deviceTags == {"tag1", "tag2"}