proof of concept #84
@@ -13,10 +13,35 @@ logger = bec_logger.logger
|
||||
|
||||
|
||||
_Widgets = {
|
||||
"DataViewer": "DataViewer",
|
||||
"DigitalTwin": "DigitalTwin",
|
||||
}
|
||||
|
||||
|
||||
class DataViewer(RPCBase):
|
||||
"""Main widget of Data Viewer"""
|
||||
|
||||
_IMPORT_MODULE = "debye_bec.bec_widgets.widgets.data_viewer.data_viewer"
|
||||
|
||||
@rpc_call
|
||||
def remove(self):
|
||||
"""
|
||||
Cleanup the BECConnector
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def attach(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def detach(self):
|
||||
"""
|
||||
Detach the widget from its parent dock widget (if widget is in the dock), making it a floating widget.
|
||||
"""
|
||||
|
||||
|
||||
class DigitalTwin(RPCBase):
|
||||
"""Main widget of Digital Twin"""
|
||||
|
||||
|
||||
@@ -0,0 +1,286 @@
|
||||
"""
|
||||
Data Viewer: Custom BEC widget to view data from scans.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from functools import partial
|
||||
|
||||
from bec_lib import bec_logger
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_widgets.utils.bec_dispatcher import BECDispatcher
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.colors import apply_theme, get_accent_colors
|
||||
from bec_widgets.utils.error_popups import SafeSlot
|
||||
from qtpy.QtCore import Qt
|
||||
|
||||
# pylint: disable=E0611
|
||||
from qtpy.QtGui import QFont
|
||||
|
||||
# pylint: disable=E0611
|
||||
from qtpy.QtWidgets import (
|
||||
QApplication,
|
||||
QComboBox,
|
||||
QDoubleSpinBox,
|
||||
QGroupBox,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QLayout,
|
||||
QPushButton,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from debye_bec.bec_widgets.widgets.data_viewer.qt_widgets import TaggedListWidget
|
||||
from debye_bec.bec_widgets.widgets.data_viewer.viewer import HDF5Viewer
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class DataViewer(BECWidget, QWidget):
|
||||
"""
|
||||
Main widget of Data Viewer
|
||||
"""
|
||||
|
||||
PLUGIN = True
|
||||
ICON_NAME = "lightbulb"
|
||||
|
||||
def __init__(self, *arg, parent=None, **kwargs):
|
||||
super().__init__(parent=parent, theme_update=True, *arg, **kwargs)
|
||||
self.get_bec_shortcuts()
|
||||
|
||||
central = QWidget()
|
||||
self.root_layout = QVBoxLayout(central)
|
||||
|
||||
self.input = InputPanel()
|
||||
self.viewer = HDF5Viewer()
|
||||
|
||||
self.root_layout.addWidget(self.input, 0)
|
||||
self.root_layout.addWidget(self.viewer, 1)
|
||||
|
||||
self.setLayout(self.root_layout)
|
||||
self.setWindowTitle("Data Viewer")
|
||||
# self.resize(1800, 800)
|
||||
|
||||
self.history = []
|
||||
self.bec_dispatcher.connect_slot(self.on_history_update, MessageEndpoints.scan_history())
|
||||
self.on_history_update()
|
||||
|
||||
self.current_row = 0
|
||||
|
||||
logger.info(self.client.acl)
|
||||
logger.info(self.client.active_account)
|
||||
logger.info(self.client.proc)
|
||||
logger.info(self.client.username)
|
||||
|
||||
self.input.scan_sel.currentItemChanged_connect(self.scan_sel_changed)
|
||||
self.input.load_button.clicked_connect(self.load_dataset)
|
||||
self.input.unload_button.clicked_connect(self.unload_all_datasets)
|
||||
|
||||
@SafeSlot()
|
||||
def scan_sel_changed(self, *_, **kwargs):
|
||||
self.current_row = kwargs["value"]().row()
|
||||
|
||||
@SafeSlot()
|
||||
def load_dataset(
|
||||
self, *_
|
||||
): # TODO: Check scan file components for combined xas/xrd scans. Is the Pilatus file in there as well?
|
||||
scan = self.history[self.current_row]
|
||||
file = scan["file_components"][0] + b"_master." + scan["file_components"][1]
|
||||
logger.info(file.decode())
|
||||
self.viewer.load_files([file.decode()])
|
||||
|
||||
@SafeSlot()
|
||||
def unload_all_datasets(self, *_):
|
||||
self.viewer.clear_files()
|
||||
|
||||
def duration_string(self, start: str, end: str) -> str:
|
||||
start_dt = datetime.fromisoformat(start)
|
||||
end_dt = datetime.fromisoformat(end)
|
||||
|
||||
seconds = abs(int((end_dt - start_dt).total_seconds()))
|
||||
|
||||
days, remainder = divmod(seconds, 86400)
|
||||
hours, remainder = divmod(remainder, 3600)
|
||||
minutes, _ = divmod(remainder, 60)
|
||||
|
||||
parts = []
|
||||
|
||||
if days:
|
||||
parts.append(f"{days}d")
|
||||
if hours:
|
||||
parts.append(f"{hours}h")
|
||||
if minutes:
|
||||
parts.append(f"{minutes}min")
|
||||
|
||||
return " ".join(parts) if parts else "<1min"
|
||||
|
||||
@SafeSlot()
|
||||
def on_history_update(self, *_):
|
||||
self.history = []
|
||||
self.input.scan_sel.clear()
|
||||
# Get the length of the scan history, which is 0 when the bec server was started
|
||||
# and no scan has finished yet. Limit the history to the latest 20 scans.
|
||||
max_scans = min(len(self.client.history), 20)
|
||||
for n in range(1, max_scans): # last scans, up to 20
|
||||
# logger.info(self.client.history[-n].metadata["bec"]["status"])
|
||||
start_time = self.client.history[-n].metadata["start_time"]
|
||||
end_time = self.client.history[-n].metadata["end_time"]
|
||||
logger.info(type(start_time))
|
||||
scan_data = self.client.history[-n].metadata["bec"]
|
||||
# logger.info(scan_data)
|
||||
scan_number = scan_data["scan_number"]
|
||||
scan_name = scan_data["scan_name"]
|
||||
comment = scan_data["metadata"]["user_metadata"]["comment"]
|
||||
sample_name = scan_data["metadata"]["user_metadata"]["sample_name"]
|
||||
status = scan_data["status"]
|
||||
self.history.append(
|
||||
{
|
||||
"scan_number": scan_number,
|
||||
"scan_name": scan_name,
|
||||
"comment": comment,
|
||||
"sample_name": sample_name,
|
||||
"file_components": scan_data["file_components"],
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
"status": status,
|
||||
}
|
||||
)
|
||||
tags = []
|
||||
tags.append((scan_name, "#4A90D9"))
|
||||
if sample_name != "":
|
||||
tags.append((sample_name, "#4A10D9"))
|
||||
if comment != "":
|
||||
tags.append((comment, "#FA10D9"))
|
||||
if status == "closed":
|
||||
tags.append((status, "#1F7023"))
|
||||
elif status == "halted":
|
||||
tags.append((status, "#A33047"))
|
||||
else:
|
||||
tags.append((status, "#656365"))
|
||||
tags.append((self.duration_string(start_time, end_time), "#656365"))
|
||||
self.input.scan_sel.addTaggedItem(label=str(scan_number), tags=tags)
|
||||
logger.info(f"Scan history: {self.history}")
|
||||
|
||||
|
||||
class InputPanel(QWidget):
|
||||
"""Panel for scan selection of the data viewer widget"""
|
||||
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent)
|
||||
self._layout = QVBoxLayout(self)
|
||||
self._layout.setSizeConstraint(QLayout.SetFixedSize) # type: ignore
|
||||
|
||||
# Scan selection
|
||||
self.scan_sel = ListWidget("scan_sel", "Scan", ["Si", "Rh", "Pt"])
|
||||
self.load_button = Button(label_button="Load Dataset", enabled=True)
|
||||
self.unload_button = Button(label_button="Unload all", enabled=True)
|
||||
|
||||
# Assemble complete scan selection group
|
||||
self.input_group = Group(
|
||||
"Scan selection", [self.scan_sel, self.load_button, self.unload_button]
|
||||
)
|
||||
|
||||
self._layout.addWidget(self.input_group)
|
||||
self._layout.addStretch()
|
||||
|
||||
|
||||
class Group(QGroupBox):
|
||||
def __init__(self, label, widgets):
|
||||
super().__init__(label)
|
||||
self.layout = QVBoxLayout(self) # type: ignore
|
||||
for widget in widgets:
|
||||
self.layout.addWidget(widget) # type: ignore
|
||||
|
||||
|
||||
class ListWidget(QWidget):
|
||||
def __init__(self, identifier="", label="", enums=[]):
|
||||
super().__init__()
|
||||
layout = QHBoxLayout(self)
|
||||
layout.setContentsMargins(10, 0, 0, 0)
|
||||
layout.setSpacing(0)
|
||||
self.identifier = identifier
|
||||
# self.label = QLabel(label)
|
||||
# self.label.setFixedWidth(140)
|
||||
# self.label.setContentsMargins(0, 0, 10, 0)
|
||||
# self.label.setWordWrap(True)
|
||||
# layout.addWidget(self.label)
|
||||
self.value = TaggedListWidget()
|
||||
self.value.setFixedWidth(400)
|
||||
# for entry in enums:
|
||||
# self.value.addItem(entry)
|
||||
layout.addWidget(self.value)
|
||||
|
||||
def clear(self):
|
||||
self.value.clear()
|
||||
|
||||
def addTaggedItem(self, label, tags):
|
||||
self.value.addTaggedItem(label, tags)
|
||||
|
||||
def setCurrentIndex(self, text):
|
||||
self.value.setCurrentIndex(text)
|
||||
|
||||
# def currentIndex(self) -> int:
|
||||
# return self.value.currentIndex()
|
||||
|
||||
# def has_focus(self) -> bool:
|
||||
# return QApplication.focusWidget() is self.value.view()
|
||||
|
||||
def currentItemChanged_connect(self, func):
|
||||
"""Connect a function to the Enter/Return key press."""
|
||||
self.value.currentItemChanged.connect(
|
||||
partial(
|
||||
func,
|
||||
identifier=self.identifier,
|
||||
value_obj=self.value,
|
||||
value=lambda: self.value.currentIndex(),
|
||||
)
|
||||
)
|
||||
|
||||
def setDisabled(self, disable):
|
||||
self.value.setDisabled(disable)
|
||||
|
||||
|
||||
class Button(QWidget):
|
||||
def __init__(self, label=None, label_button: str = "", enabled=False):
|
||||
super().__init__()
|
||||
layout = QHBoxLayout(self)
|
||||
layout.setContentsMargins(10, 0, 0, 0)
|
||||
layout.setSpacing(0)
|
||||
if label is not None:
|
||||
self.label = QLabel(label)
|
||||
self.label.setFixedWidth(140)
|
||||
layout.addWidget(self.label)
|
||||
self.button = QPushButton(label_button)
|
||||
if label is not None:
|
||||
self.button.setFixedWidth(160)
|
||||
self.enable_button(enabled)
|
||||
layout.addWidget(self.button)
|
||||
|
||||
def clicked_connect(self, func):
|
||||
"""Connect a function to the button press."""
|
||||
self.button.clicked.connect(func)
|
||||
|
||||
def enable_button(self, enable: bool = False):
|
||||
if enable:
|
||||
self.button.setStyleSheet(
|
||||
f"QPushButton {{background-color: {get_accent_colors().default.name()}; color: white;}}"
|
||||
)
|
||||
self.button.setEnabled(True)
|
||||
else: # disabled
|
||||
self.button.setStyleSheet(
|
||||
"QPushButton {{background-color: rgb(120, 120, 120); color: white;}}"
|
||||
)
|
||||
self.button.setDisabled(True)
|
||||
|
||||
def setText(self, text):
|
||||
self.button.setText(text)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = QApplication(sys.argv)
|
||||
apply_theme("light")
|
||||
dispatcher = BECDispatcher(gui_id="data_viewer")
|
||||
win = DataViewer()
|
||||
win.show()
|
||||
sys.exit(app.exec_())
|
||||
@@ -0,0 +1 @@
|
||||
{'files': ['data_viewer.py']}
|
||||
@@ -0,0 +1,57 @@
|
||||
# Copyright (C) 2022 The Qt Company Ltd.
|
||||
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
|
||||
|
||||
from bec_widgets.utils.bec_designer import designer_material_icon
|
||||
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from debye_bec.bec_widgets.widgets.data_viewer.data_viewer import DataViewer
|
||||
|
||||
DOM_XML = """
|
||||
<ui language='c++'>
|
||||
<widget class='DataViewer' name='data_viewer'>
|
||||
</widget>
|
||||
</ui>
|
||||
"""
|
||||
|
||||
|
||||
class DataViewerPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._form_editor = None
|
||||
|
||||
def createWidget(self, parent):
|
||||
if parent is None:
|
||||
return QWidget()
|
||||
t = DataViewer(parent)
|
||||
return t
|
||||
|
||||
def domXml(self):
|
||||
return DOM_XML
|
||||
|
||||
def group(self):
|
||||
return ""
|
||||
|
||||
def icon(self):
|
||||
return designer_material_icon(DataViewer.ICON_NAME)
|
||||
|
||||
def includeFile(self):
|
||||
return "data_viewer"
|
||||
|
||||
def initialize(self, form_editor):
|
||||
self._form_editor = form_editor
|
||||
|
||||
def isContainer(self):
|
||||
return False
|
||||
|
||||
def isInitialized(self):
|
||||
return self._form_editor is not None
|
||||
|
||||
def name(self):
|
||||
return "DataViewer"
|
||||
|
||||
def toolTip(self):
|
||||
return "DataViewer"
|
||||
|
||||
def whatsThis(self):
|
||||
return self.toolTip()
|
||||
@@ -0,0 +1,151 @@
|
||||
"""
|
||||
TaggedListWidget — a QListWidget where each item shows a label + styled tag pills.
|
||||
Selection works natively; no popup, no paintEvent hacks.
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
from qtpy.QtCore import QPoint, QRect, QSize, Qt
|
||||
from qtpy.QtGui import QColor, QFont, QFontMetrics, QPainter, QPen
|
||||
from qtpy.QtWidgets import (
|
||||
QApplication,
|
||||
QLabel,
|
||||
QListWidget,
|
||||
QListWidgetItem,
|
||||
QStyle,
|
||||
QStyledItemDelegate,
|
||||
QStyleOptionViewItem,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
# ── Design tokens ──────────────────────────────────────────────────────────────
|
||||
ITEM_HEIGHT = 30
|
||||
H_PAD = 12
|
||||
TAG_H_PAD = 7
|
||||
TAG_V_PAD = 3
|
||||
TAG_GAP = 5
|
||||
LABEL_TAG_GAP = 12
|
||||
CORNER_RADIUS = 4
|
||||
TAG_TEXT_COLOR = "#FFFFFF"
|
||||
|
||||
# ── Enum compat (PySide6 nested vs PyQt5 flat) ────────────────────────────────
|
||||
try:
|
||||
_DEMIBOLD = QFont.Weight.DemiBold
|
||||
_MEDIUM = QFont.Weight.Medium
|
||||
except AttributeError:
|
||||
_DEMIBOLD = QFont.DemiBold
|
||||
_MEDIUM = QFont.Medium
|
||||
|
||||
_AlignVCenter = Qt.AlignmentFlag.AlignVCenter if hasattr(Qt, "AlignmentFlag") else Qt.AlignVCenter
|
||||
_AlignCenter = Qt.AlignmentFlag.AlignCenter if hasattr(Qt, "AlignmentFlag") else Qt.AlignCenter
|
||||
_UserRole = Qt.ItemDataRole.UserRole if hasattr(Qt, "ItemDataRole") else Qt.UserRole
|
||||
_NoPen = Qt.PenStyle.NoPen if hasattr(Qt, "PenStyle") else Qt.NoPen
|
||||
_AA = QPainter.RenderHint.Antialiasing if hasattr(QPainter, "RenderHint") else QPainter.Antialiasing
|
||||
|
||||
try:
|
||||
_State_Selected = QStyle.StateFlag.State_Selected
|
||||
_State_MouseOver = QStyle.StateFlag.State_MouseOver
|
||||
except AttributeError:
|
||||
_State_Selected = QStyle.State_Selected
|
||||
_State_MouseOver = QStyle.State_MouseOver
|
||||
|
||||
|
||||
# ── Delegate ──────────────────────────────────────────────────────────────────
|
||||
class TaggedDelegate(QStyledItemDelegate):
|
||||
|
||||
def sizeHint(self, option: QStyleOptionViewItem, index) -> QSize:
|
||||
return QSize(option.rect.width() or 300, ITEM_HEIGHT)
|
||||
|
||||
def paint(self, painter: QPainter, option: QStyleOptionViewItem, index) -> None:
|
||||
painter.save()
|
||||
|
||||
is_selected = bool(option.state & _State_Selected)
|
||||
is_hover = bool(option.state & _State_MouseOver)
|
||||
|
||||
# Background
|
||||
if is_selected:
|
||||
painter.fillRect(option.rect, option.palette.highlight())
|
||||
elif is_hover:
|
||||
painter.fillRect(option.rect, QColor("#F1F5F9"))
|
||||
else:
|
||||
painter.fillRect(option.rect, option.palette.base())
|
||||
|
||||
label_text = (
|
||||
index.data(
|
||||
Qt.ItemDataRole.DisplayRole if hasattr(Qt, "ItemDataRole") else Qt.DisplayRole
|
||||
)
|
||||
or ""
|
||||
)
|
||||
tags: list = index.data(_UserRole) or []
|
||||
|
||||
# Label
|
||||
label_font = painter.font() # inherit default font
|
||||
label_font.setWeight(_DEMIBOLD)
|
||||
painter.setFont(label_font)
|
||||
label_color = (
|
||||
option.palette.highlightedText().color()
|
||||
if is_selected
|
||||
else option.palette.text().color()
|
||||
)
|
||||
painter.setPen(QPen(label_color))
|
||||
|
||||
fm = QFontMetrics(label_font)
|
||||
label_w = fm.horizontalAdvance(label_text)
|
||||
label_y = option.rect.top() + (ITEM_HEIGHT - fm.height()) // 2 + fm.ascent()
|
||||
painter.drawText(QPoint(option.rect.left() + H_PAD, label_y), label_text)
|
||||
|
||||
# Tag pills
|
||||
tag_font = painter.font() # inherit default font
|
||||
tag_font.setWeight(_MEDIUM)
|
||||
fm_tag = QFontMetrics(tag_font)
|
||||
|
||||
x = option.rect.left() + H_PAD + label_w + LABEL_TAG_GAP
|
||||
for tag_text, hex_color in tags:
|
||||
tag_text = str(tag_text)
|
||||
tw = fm_tag.horizontalAdvance(tag_text) + 2 * TAG_H_PAD
|
||||
th = fm_tag.height() + 2 * TAG_V_PAD
|
||||
ty = option.rect.top() + (ITEM_HEIGHT - th) // 2
|
||||
pill = QRect(x, ty, tw, th)
|
||||
|
||||
fill = QColor(hex_color)
|
||||
if is_selected:
|
||||
fill = fill.lighter(140)
|
||||
|
||||
painter.setRenderHint(_AA)
|
||||
painter.setPen(_NoPen)
|
||||
painter.setBrush(fill)
|
||||
painter.drawRoundedRect(pill, CORNER_RADIUS, CORNER_RADIUS)
|
||||
|
||||
painter.setFont(tag_font)
|
||||
painter.setPen(QPen(QColor(TAG_TEXT_COLOR)))
|
||||
painter.drawText(pill, _AlignCenter, tag_text)
|
||||
|
||||
x += tw + TAG_GAP
|
||||
|
||||
painter.restore()
|
||||
|
||||
|
||||
# ── Widget ────────────────────────────────────────────────────────────────────
|
||||
class TaggedListWidget(QListWidget):
|
||||
"""QListWidget with label + coloured tag pills per row."""
|
||||
|
||||
def __init__(self, parent=None) -> None:
|
||||
super().__init__(parent)
|
||||
self.setItemDelegate(TaggedDelegate(self))
|
||||
self.setMouseTracking(True) # enables hover highlight
|
||||
|
||||
def addTaggedItem(self, label: str, tags: list | None = None) -> QListWidgetItem:
|
||||
"""
|
||||
Add a row. tags is a list of (text, hex_color) pairs, e.g.
|
||||
[("v1.26", "#2563EB"), ("stable", "#16A34A")]
|
||||
"""
|
||||
item = QListWidgetItem(str(label))
|
||||
if tags:
|
||||
item.setData(_UserRole, list(tags))
|
||||
self.addItem(item)
|
||||
return item
|
||||
|
||||
def currentTags(self) -> list:
|
||||
item = self.currentItem()
|
||||
return item.data(_UserRole) or [] if item else []
|
||||
@@ -0,0 +1,15 @@
|
||||
def main(): # pragma: no cover
|
||||
from qtpy import PYSIDE6
|
||||
|
||||
if not PYSIDE6:
|
||||
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
|
||||
return
|
||||
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
|
||||
|
||||
from debye_bec.bec_widgets.widgets.data_viewer.data_viewer_plugin import DataViewerPlugin
|
||||
|
||||
QPyDesignerCustomWidgetCollection.addCustomWidget(DataViewerPlugin())
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
@@ -0,0 +1,503 @@
|
||||
"""
|
||||
HDF5 Viewer — qtpy + pyqtgraph + h5py
|
||||
"""
|
||||
|
||||
import h5py
|
||||
import numpy as np
|
||||
import pyqtgraph as pg
|
||||
from bec_lib import bec_logger
|
||||
from bec_qthemes import material_icon
|
||||
from qtpy.QtCore import Qt
|
||||
from qtpy.QtGui import QColor, QFont
|
||||
from qtpy.QtWidgets import (
|
||||
QAbstractItemView,
|
||||
QGroupBox,
|
||||
QHBoxLayout,
|
||||
QHeaderView,
|
||||
QLabel,
|
||||
QMainWindow,
|
||||
QRadioButton,
|
||||
QSizePolicy,
|
||||
QSlider,
|
||||
QSplitter,
|
||||
QTableWidget,
|
||||
QTableWidgetItem,
|
||||
QTreeWidget,
|
||||
QTreeWidgetItem,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
# ── Palette ────────────────────────────────────────────────────────────────
|
||||
ACCENT = "#7C9CF5"
|
||||
TEXT_SEC = "#8A8FA8"
|
||||
SUCCESS = "#5FCEA8"
|
||||
|
||||
# pyqtgraph color tuples (R, G, B)
|
||||
PG_BG = (28, 30, 38)
|
||||
PG_PLOT_BG = (45, 48, 64)
|
||||
PG_ACCENT = (124, 156, 245)
|
||||
PG_GRID = (58, 61, 80)
|
||||
PG_TEXT = (138, 143, 168)
|
||||
|
||||
ICON_SIZE = 20
|
||||
|
||||
# ── pyqtgraph global config ────────────────────────────────────────────────
|
||||
pg.setConfigOptions(
|
||||
background=pg.mkColor(*PG_BG),
|
||||
foreground=pg.mkColor(*PG_TEXT),
|
||||
antialias=True,
|
||||
useOpenGL=False, # safer default; set True for large datasets if GPU available
|
||||
)
|
||||
|
||||
|
||||
def _styled_plot_widget(**kwargs) -> pg.PlotWidget:
|
||||
"""Return a PlotWidget already themed to match the dark palette."""
|
||||
pw = pg.PlotWidget(**kwargs)
|
||||
# pw.setBackground(pg.mkColor(*PG_PLOT_BG))
|
||||
ax = pw.getPlotItem()
|
||||
for side in ("left", "bottom", "right", "top"):
|
||||
ax.getAxis(side).setPen(pg.mkPen(color=PG_GRID, width=1))
|
||||
ax.getAxis(side).setTextPen(pg.mkPen(color=PG_TEXT))
|
||||
ax.showGrid(x=True, y=True, alpha=0.25)
|
||||
return pw
|
||||
|
||||
|
||||
# ── HDF5 Tree helpers ──────────────────────────────────────────────────────
|
||||
# TYPE_ICONS = {"group": "📂", "dataset": "📊"}
|
||||
|
||||
|
||||
def dtype_tag(ds):
|
||||
d = ds.dtype
|
||||
if np.issubdtype(d, np.integer):
|
||||
return "int"
|
||||
if np.issubdtype(d, np.floating):
|
||||
return "float"
|
||||
if np.issubdtype(d, np.complexfloating):
|
||||
return "complex"
|
||||
if d.kind in ("S", "U", "O"):
|
||||
return "str"
|
||||
return str(d)
|
||||
|
||||
|
||||
def populate_tree(parent_item, h5_obj):
|
||||
folder_icon = material_icon("folder", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
vector_icon = material_icon("show_chart", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
array_icon = material_icon("stacked_line_chart", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
scalar_icon = material_icon("point_scan", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
str_icon = material_icon("text_snippet", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
|
||||
if not isinstance(h5_obj, h5py.Group):
|
||||
return
|
||||
|
||||
for key in h5_obj.keys():
|
||||
try:
|
||||
child = h5_obj[key]
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if isinstance(child, h5py.Group):
|
||||
item = QTreeWidgetItem(parent_item, [key, "Group", ""])
|
||||
item.setIcon(0, folder_icon)
|
||||
|
||||
item.setData(0, Qt.UserRole, child.name)
|
||||
item.setData(0, Qt.UserRole + 1, "group")
|
||||
|
||||
item.setForeground(0, QColor(ACCENT))
|
||||
|
||||
populate_tree(item, child)
|
||||
|
||||
elif isinstance(child, h5py.Dataset):
|
||||
shape_str = "x".join(str(s) for s in child.shape) or "scalar"
|
||||
|
||||
dtype = dtype_tag(child)
|
||||
|
||||
if shape_str == "scalar":
|
||||
if dtype == "str":
|
||||
icon = str_icon
|
||||
else:
|
||||
icon = scalar_icon
|
||||
elif "x" in shape_str:
|
||||
icon = array_icon
|
||||
else:
|
||||
icon = vector_icon
|
||||
|
||||
item = QTreeWidgetItem(parent_item, [key, dtype, shape_str])
|
||||
item.setIcon(0, icon)
|
||||
|
||||
item.setData(0, Qt.UserRole, child.name)
|
||||
item.setData(0, Qt.UserRole + 1, "dataset")
|
||||
|
||||
item.setForeground(1, QColor(SUCCESS))
|
||||
item.setForeground(2, QColor(TEXT_SEC))
|
||||
|
||||
|
||||
# ── Data / Plot panel ──────────────────────────────────────────────────────
|
||||
class DataPanel(QWidget):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._layout = QVBoxLayout(self)
|
||||
|
||||
# Header
|
||||
hdr = QHBoxLayout()
|
||||
self.path_label = QLabel("Select a dataset from the tree")
|
||||
self.path_label.setObjectName("path_label")
|
||||
self.info_label = QLabel("")
|
||||
self.info_label.setObjectName("info_label")
|
||||
hdr.addWidget(self.path_label, 1)
|
||||
hdr.addWidget(self.info_label)
|
||||
self._layout.addLayout(hdr)
|
||||
|
||||
# View-mode selector
|
||||
mode_box = QGroupBox("View mode")
|
||||
mode_layout = QHBoxLayout(mode_box)
|
||||
mode_layout.setContentsMargins(8, 4, 8, 4)
|
||||
self.rb_auto = QRadioButton("Auto")
|
||||
self.rb_plot = QRadioButton("Plot")
|
||||
self.rb_image = QRadioButton("Image")
|
||||
self.rb_table = QRadioButton("Table")
|
||||
self.rb_auto.setChecked(True)
|
||||
for rb in (self.rb_auto, self.rb_plot, self.rb_image, self.rb_table):
|
||||
mode_layout.addWidget(rb)
|
||||
rb.toggled.connect(self._on_mode_change)
|
||||
mode_layout.addStretch()
|
||||
self._layout.addWidget(mode_box)
|
||||
|
||||
# Content stack
|
||||
self.stack = QWidget()
|
||||
self.stack_layout = QVBoxLayout(self.stack)
|
||||
self.stack_layout.setContentsMargins(0, 0, 0, 0)
|
||||
self._layout.addWidget(self.stack, 1)
|
||||
|
||||
self._current_data = None
|
||||
self._show_empty()
|
||||
|
||||
# ── helpers ───────────────────────────────────────────────────────────
|
||||
def _clear_stack(self):
|
||||
while self.stack_layout.count():
|
||||
item = self.stack_layout.takeAt(0)
|
||||
if item.widget():
|
||||
item.widget().deleteLater()
|
||||
|
||||
def _show_empty(self):
|
||||
self._clear_stack()
|
||||
lbl = QLabel("No data selected")
|
||||
lbl.setObjectName("info_label")
|
||||
lbl.setAlignment(Qt.AlignCenter)
|
||||
self.stack_layout.addWidget(lbl)
|
||||
|
||||
def _on_mode_change(self):
|
||||
if self._current_data is not None:
|
||||
self.display(self._current_data, self.path_label.text())
|
||||
|
||||
def _active_mode(self):
|
||||
if self.rb_plot.isChecked():
|
||||
return "plot"
|
||||
if self.rb_image.isChecked():
|
||||
return "image"
|
||||
if self.rb_table.isChecked():
|
||||
return "table"
|
||||
return "auto"
|
||||
|
||||
# ── public ────────────────────────────────────────────────────────────
|
||||
def display(self, data, path=""):
|
||||
self._current_data = data
|
||||
self.path_label.setText(path)
|
||||
|
||||
if not isinstance(data, np.ndarray):
|
||||
data = np.array(data)
|
||||
|
||||
self.info_label.setText(
|
||||
f"shape {data.shape} · dtype {data.dtype} · {data.size:,} elements"
|
||||
)
|
||||
|
||||
mode = self._active_mode()
|
||||
if mode == "auto":
|
||||
if data.ndim <= 1 and data.size > 1:
|
||||
mode = "plot"
|
||||
elif data.ndim == 2 and min(data.shape) > 1:
|
||||
mode = "image"
|
||||
else:
|
||||
mode = "table"
|
||||
|
||||
if (
|
||||
mode == "plot"
|
||||
): # TODO disable plot and image if data is str, maybe completely disable manual mode and only allow image/plot for arrays
|
||||
self._show_plot_1d(data)
|
||||
elif mode == "image":
|
||||
self._show_image_2d(data)
|
||||
else:
|
||||
self._show_table(data)
|
||||
|
||||
# ── 1-D line plot ──────────────────────────────────────────────────────
|
||||
def _show_plot_1d(self, data):
|
||||
self._clear_stack()
|
||||
|
||||
is_2d = data.ndim == 2
|
||||
|
||||
if is_2d:
|
||||
n_rows, n_cols = data.shape
|
||||
row_data = data[0].astype(np.float32)
|
||||
else:
|
||||
row_data = data.reshape(-1).astype(np.float32)
|
||||
|
||||
x = np.arange(row_data.size, dtype=np.float32)
|
||||
|
||||
pw = _styled_plot_widget()
|
||||
pw.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
|
||||
|
||||
plot_item = pw.getPlotItem()
|
||||
plot_item.setAutoVisible(y=False)
|
||||
|
||||
curve = pg.PlotDataItem(
|
||||
x, row_data, pen=pg.mkPen(color=PG_ACCENT, width=1.6), antialias=False
|
||||
)
|
||||
pw.addItem(curve)
|
||||
|
||||
curve.setDownsampling(auto=True, method="peak")
|
||||
curve.setClipToView(True)
|
||||
curve.setSkipFiniteCheck(True)
|
||||
|
||||
# plot_item.setLabel("bottom", "index")
|
||||
# plot_item.setLabel("left", "value")
|
||||
plot_item.enableAutoRange()
|
||||
|
||||
if is_2d:
|
||||
# --- Slider ---
|
||||
slider = QSlider(Qt.Vertical)
|
||||
slider.setMinimum(0)
|
||||
slider.setMaximum(n_rows - 1)
|
||||
slider.setValue(0)
|
||||
# slider.setInvertedAppearance(True) # row 0 at top
|
||||
slider.setFixedWidth(32)
|
||||
# slider.setSingleStep(1)
|
||||
slider.setPageStep(1)
|
||||
|
||||
row_label = QLabel("0")
|
||||
row_label.setAlignment(Qt.AlignCenter)
|
||||
row_label.setFixedWidth(32)
|
||||
# row_label.setStyleSheet("color: #aaa; font-size: 10px;")
|
||||
|
||||
def on_row_changed(row):
|
||||
row_label.setText(str(row))
|
||||
new_data = data[row].astype(np.float32)
|
||||
new_x = np.arange(new_data.size, dtype=np.float32)
|
||||
curve.setData(new_x, new_data)
|
||||
plot_item.enableAutoRange()
|
||||
|
||||
slider.valueChanged.connect(on_row_changed)
|
||||
|
||||
slider_col = QWidget()
|
||||
slider_col.setFixedWidth(36)
|
||||
col_layout = QVBoxLayout(slider_col)
|
||||
col_layout.setContentsMargins(0, 0, 0, 0)
|
||||
col_layout.setSpacing(2)
|
||||
col_layout.addWidget(row_label)
|
||||
col_layout.addWidget(slider)
|
||||
|
||||
container = QWidget()
|
||||
h_layout = QHBoxLayout(container)
|
||||
h_layout.setContentsMargins(0, 0, 0, 0)
|
||||
h_layout.setSpacing(4)
|
||||
h_layout.addWidget(slider_col)
|
||||
h_layout.addWidget(pw)
|
||||
|
||||
self.stack_layout.addWidget(container)
|
||||
else:
|
||||
self.stack_layout.addWidget(pw)
|
||||
|
||||
# ── 2-D image ──────────────────────────────────────────────────────────
|
||||
def _show_image_2d(self, data):
|
||||
self._clear_stack()
|
||||
|
||||
squeezed = np.squeeze(data)
|
||||
if squeezed.ndim > 2:
|
||||
squeezed = squeezed.reshape(-1, squeezed.shape[-1])
|
||||
|
||||
# complex → magnitude
|
||||
img_data = np.abs(squeezed) if np.iscomplexobj(squeezed) else squeezed.astype(float)
|
||||
|
||||
# ImageView gives us colorbar + histogram + zoom for free
|
||||
iv = pg.ImageView()
|
||||
iv.getView().setBackgroundColor(pg.mkColor(*PG_BG))
|
||||
iv.ui.histogram.setBackground(pg.mkColor(*PG_PLOT_BG))
|
||||
iv.ui.roiBtn.hide()
|
||||
iv.ui.menuBtn.hide()
|
||||
|
||||
# Use 'inferno'-like LUT
|
||||
iv.setColorMap(pg.colormap.get("inferno", source="matplotlib"))
|
||||
|
||||
# pyqtgraph ImageView expects (cols, rows) — transpose so row 0 is at top
|
||||
iv.setImage(img_data.T, autoLevels=True, autoHistogramRange=True)
|
||||
iv.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
|
||||
|
||||
self.stack_layout.addWidget(iv)
|
||||
|
||||
# ── Table ──────────────────────────────────────────────────────────────
|
||||
def _show_table(self, data):
|
||||
self._clear_stack()
|
||||
|
||||
MAX_ROWS, MAX_COLS = 2000, 500
|
||||
|
||||
if data.ndim == 0:
|
||||
flat = data.reshape(1, 1)
|
||||
elif data.ndim == 1:
|
||||
flat = data.reshape(-1, 1)
|
||||
elif data.ndim == 2:
|
||||
flat = data
|
||||
else:
|
||||
flat = data.reshape(-1, data.shape[-1])
|
||||
|
||||
rows, cols = flat.shape
|
||||
show_rows = min(rows, MAX_ROWS)
|
||||
show_cols = min(cols, MAX_COLS)
|
||||
|
||||
if rows > MAX_ROWS or cols > MAX_COLS:
|
||||
note = QLabel(f"⚠ Showing {show_rows}/{rows} rows × {show_cols}/{cols} cols")
|
||||
note.setObjectName("info_label")
|
||||
note.setAlignment(Qt.AlignCenter)
|
||||
self.stack_layout.addWidget(note)
|
||||
|
||||
table = QTableWidget(show_rows, show_cols)
|
||||
table.setEditTriggers(QAbstractItemView.NoEditTriggers)
|
||||
table.setSelectionMode(QAbstractItemView.ContiguousSelection)
|
||||
table.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
|
||||
table.horizontalHeader().setDefaultSectionSize(90)
|
||||
table.verticalHeader().setDefaultSectionSize(22)
|
||||
table.setFont(QFont("JetBrains Mono, Consolas, monospace", 10))
|
||||
|
||||
is_float = np.issubdtype(flat.dtype, np.floating)
|
||||
is_complex = np.iscomplexobj(flat)
|
||||
is_bytes = flat.dtype.kind == "S"
|
||||
|
||||
for r in range(show_rows):
|
||||
for c in range(show_cols):
|
||||
val = flat[r, c]
|
||||
txt = (
|
||||
f"{val:.6g}"
|
||||
if is_float
|
||||
else f"{val:.4g}" if is_complex else str(val.decode()) if is_bytes else str(val)
|
||||
)
|
||||
cell = QTableWidgetItem(txt)
|
||||
cell.setTextAlignment(Qt.AlignRight | Qt.AlignVCenter)
|
||||
table.setItem(r, c, cell)
|
||||
|
||||
self.stack_layout.addWidget(table)
|
||||
|
||||
|
||||
# ── Main window ────────────────────────────────────────────────────────────
|
||||
class HDF5Viewer(QMainWindow):
|
||||
def __init__(self, filepath=None):
|
||||
super().__init__()
|
||||
self.h5files = {} # filepath -> h5py.File
|
||||
self._build_ui()
|
||||
if filepath:
|
||||
self.load_files([filepath])
|
||||
|
||||
def load_files(self, filepaths: list[str]):
|
||||
"""Open one or more HDF5 files and add each as a top-level tree node."""
|
||||
for f in filepaths:
|
||||
if f in self.h5files:
|
||||
continue # already loaded
|
||||
self.h5files[f] = h5py.File(f, "r")
|
||||
self._add_file_to_tree(f)
|
||||
|
||||
def _build_ui(self):
|
||||
central = QWidget()
|
||||
self.setCentralWidget(central)
|
||||
|
||||
root_layout = QHBoxLayout(central)
|
||||
|
||||
splitter = QSplitter(Qt.Horizontal)
|
||||
splitter.setChildrenCollapsible(False)
|
||||
|
||||
# ── Left pane ──
|
||||
left_pane = QWidget()
|
||||
left_layout = QVBoxLayout(left_pane)
|
||||
|
||||
self.tree = QTreeWidget()
|
||||
self.tree.setHeaderLabels(["Name", "Type", "Shape"])
|
||||
self.tree.header().setStretchLastSection(False)
|
||||
self.tree.header().setSectionResizeMode(0, QHeaderView.Stretch)
|
||||
self.tree.header().setSectionResizeMode(1, QHeaderView.ResizeToContents)
|
||||
self.tree.header().setSectionResizeMode(2, QHeaderView.ResizeToContents)
|
||||
self.tree.itemClicked.connect(self._on_item_clicked)
|
||||
|
||||
left_layout.addWidget(self.tree, 1)
|
||||
|
||||
# ── Right pane ──
|
||||
self.data_panel = DataPanel()
|
||||
|
||||
splitter.addWidget(left_pane)
|
||||
splitter.addWidget(self.data_panel)
|
||||
|
||||
splitter.setStretchFactor(0, 1)
|
||||
splitter.setStretchFactor(1, 3)
|
||||
splitter.setSizes([300, 900])
|
||||
splitter.setHandleWidth(6)
|
||||
splitter.setChildrenCollapsible(False)
|
||||
|
||||
root_layout.addWidget(splitter)
|
||||
|
||||
def _add_file_to_tree(self, filepath: str):
|
||||
"""Add a single file as a new top-level node in the tree."""
|
||||
h5file = self.h5files[filepath]
|
||||
filename = filepath.split("/")[-1]
|
||||
|
||||
dataset_icon = material_icon("dataset", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
root_item = QTreeWidgetItem(self.tree, [filename, "Group", ""])
|
||||
root_item.setIcon(0, dataset_icon)
|
||||
root_item.setData(0, Qt.UserRole, "/")
|
||||
root_item.setData(0, Qt.UserRole + 1, "group")
|
||||
root_item.setData(0, Qt.UserRole + 2, filepath) # so clicks know which file
|
||||
root_item.setForeground(0, QColor(ACCENT))
|
||||
|
||||
populate_tree(root_item, h5file)
|
||||
self.tree.addTopLevelItem(root_item)
|
||||
|
||||
# Expand first 2 levels
|
||||
self.tree.expandItem(root_item)
|
||||
for i in range(root_item.childCount()):
|
||||
self.tree.expandItem(root_item.child(i))
|
||||
|
||||
self.tree.setCurrentItem(root_item)
|
||||
|
||||
def _get_filepath_for_item(self, item: QTreeWidgetItem) -> str:
|
||||
"""Walk up the tree to find the filepath stored on the root node."""
|
||||
node = item
|
||||
while node.parent():
|
||||
node = node.parent()
|
||||
return node.data(0, Qt.UserRole + 2)
|
||||
|
||||
def _on_item_clicked(self, item, _col):
|
||||
path = item.data(0, Qt.UserRole)
|
||||
kind = item.data(0, Qt.UserRole + 1)
|
||||
filepath = self._get_filepath_for_item(item)
|
||||
|
||||
if not path or not filepath:
|
||||
return
|
||||
|
||||
h5file = self.h5files[filepath]
|
||||
obj = h5file[path] if path != "/" else h5file
|
||||
|
||||
if kind == "dataset":
|
||||
data = h5file[path][()]
|
||||
self.data_panel.display(data, path)
|
||||
else:
|
||||
n = len(obj) if isinstance(obj, h5py.Group) else 0
|
||||
|
||||
def clear_files(self):
|
||||
"""Close all open HDF5 files and reset the tree."""
|
||||
for h5file in self.h5files.values():
|
||||
h5file.close()
|
||||
self.h5files.clear()
|
||||
self.tree.clear()
|
||||
self.data_panel._show_empty() # TODO: If it works, make function public!
|
||||
|
||||
def closeEvent(self, event):
|
||||
for h5file in self.h5files.values():
|
||||
h5file.close()
|
||||
self.h5files.clear()
|
||||
super().closeEvent(event)
|
||||
@@ -5,9 +5,11 @@ from __future__ import annotations
|
||||
# pylint: skip-file
|
||||
|
||||
designer_plugins = {
|
||||
"DataViewer": ("debye_bec.bec_widgets.widgets.data_viewer.data_viewer", "DataViewer"),
|
||||
"DigitalTwin": ("debye_bec.bec_widgets.widgets.digital_twin.digital_twin", "DigitalTwin"),
|
||||
}
|
||||
|
||||
widget_icons = {
|
||||
"DataViewer": "lightbulb",
|
||||
"DigitalTwin": "lightbulb",
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user