widget development and hutch cameras fix #86
@@ -2,9 +2,12 @@
|
||||
Data Viewer: Custom BEC widget to view data from scans.
|
||||
"""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from functools import partial
|
||||
from typing import Literal, Optional, cast
|
||||
|
||||
from bec_lib import bec_logger
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
@@ -36,6 +39,8 @@ from debye_bec.bec_widgets.widgets.data_viewer.viewer import HDF5Viewer
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
MAX_HIST_LEN = 100
|
||||
|
||||
|
||||
class DataViewer(BECWidget, QWidget):
|
||||
"""
|
||||
@@ -43,12 +48,14 @@ class DataViewer(BECWidget, QWidget):
|
||||
"""
|
||||
|
||||
PLUGIN = True
|
||||
ICON_NAME = "lightbulb"
|
||||
ICON_NAME = "find_in_page"
|
||||
|
||||
def __init__(self, *arg, parent=None, **kwargs):
|
||||
super().__init__(parent=parent, theme_update=True, *arg, **kwargs)
|
||||
self.get_bec_shortcuts()
|
||||
|
||||
logger.info(f"Type of self.client: {type(self.client)}")
|
||||
|
||||
central = QWidget()
|
||||
self.root_layout = QVBoxLayout(central)
|
||||
|
||||
@@ -76,19 +83,44 @@ class DataViewer(BECWidget, QWidget):
|
||||
self.input.scan_sel.currentItemChanged_connect(self.scan_sel_changed)
|
||||
self.input.load_button.clicked_connect(self.load_dataset)
|
||||
self.input.unload_button.clicked_connect(self.unload_all_datasets)
|
||||
self.input.open_fm_button.clicked_connect(self.open_in_file_manager)
|
||||
|
||||
def apply_theme(self, theme: Literal["dark", "light"]):
|
||||
"""
|
||||
Apply the theme
|
||||
|
||||
Args:
|
||||
theme (str): Theme, either "dark" or "light"
|
||||
"""
|
||||
self.viewer.apply_theme(theme)
|
||||
self.on_history_update()
|
||||
|
||||
@SafeSlot()
|
||||
def scan_sel_changed(self, *_, **kwargs):
|
||||
self.current_row = kwargs["value"]().row()
|
||||
|
||||
@SafeSlot()
|
||||
def open_in_file_manager(self, *_):
|
||||
if len(self.history) > 0:
|
||||
scan = self.history[self.current_row]
|
||||
filepath = scan["file_components"][0].decode().rsplit("/", 1)[0]
|
||||
subprocess.Popen(
|
||||
["xdg-open", filepath],
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
stdin=subprocess.DEVNULL,
|
||||
start_new_session=True,
|
||||
)
|
||||
|
||||
@SafeSlot()
|
||||
def load_dataset(
|
||||
self, *_
|
||||
): # TODO: Check scan file components for combined xas/xrd scans. Is the Pilatus file in there as well?
|
||||
scan = self.history[self.current_row]
|
||||
file = scan["file_components"][0] + b"_master." + scan["file_components"][1]
|
||||
logger.info(file.decode())
|
||||
self.viewer.load_files([file.decode()])
|
||||
if len(self.history) > 0:
|
||||
scan = self.history[self.current_row]
|
||||
file = scan["file_components"][0] + b"_master." + scan["file_components"][1]
|
||||
logger.info(file.decode())
|
||||
self.viewer.load_files([file.decode()])
|
||||
|
||||
@SafeSlot()
|
||||
def unload_all_datasets(self, *_):
|
||||
@@ -121,12 +153,12 @@ class DataViewer(BECWidget, QWidget):
|
||||
self.input.scan_sel.clear()
|
||||
# Get the length of the scan history, which is 0 when the bec server was started
|
||||
# and no scan has finished yet. Limit the history to the latest 20 scans.
|
||||
max_scans = min(len(self.client.history), 20)
|
||||
for n in range(1, max_scans): # last scans, up to 20
|
||||
max_scans = min(len(self.client.history), MAX_HIST_LEN)
|
||||
for n in range(1, max_scans): # last scans, limited by MAX_HIST_LEN
|
||||
# logger.info(self.client.history[-n].metadata["bec"]["status"])
|
||||
start_time = self.client.history[-n].metadata["start_time"]
|
||||
end_time = self.client.history[-n].metadata["end_time"]
|
||||
logger.info(type(start_time))
|
||||
# logger.info(type(start_time))
|
||||
scan_data = self.client.history[-n].metadata["bec"]
|
||||
# logger.info(scan_data)
|
||||
scan_number = scan_data["scan_number"]
|
||||
@@ -147,20 +179,20 @@ class DataViewer(BECWidget, QWidget):
|
||||
}
|
||||
)
|
||||
tags = []
|
||||
tags.append((scan_name, "#4A90D9"))
|
||||
tags.append((scan_name, get_accent_colors().default.name()))
|
||||
if sample_name != "":
|
||||
tags.append((sample_name, "#4A10D9"))
|
||||
tags.append((sample_name, get_accent_colors().highlight.name()))
|
||||
if comment != "":
|
||||
tags.append((comment, "#FA10D9"))
|
||||
tags.append((comment, get_accent_colors().warning.name()))
|
||||
if status == "closed":
|
||||
tags.append((status, "#1F7023"))
|
||||
tags.append((status, get_accent_colors().success.name()))
|
||||
elif status == "halted":
|
||||
tags.append((status, "#A33047"))
|
||||
tags.append((status, get_accent_colors().emergency.name()))
|
||||
else:
|
||||
tags.append((status, "#656365"))
|
||||
tags.append((self.duration_string(start_time, end_time), "#656365"))
|
||||
self.input.scan_sel.addTaggedItem(label=str(scan_number), tags=tags)
|
||||
logger.info(f"Scan history: {self.history}")
|
||||
# logger.info(f"Scan history: {self.history}")
|
||||
|
||||
|
||||
class InputPanel(QWidget):
|
||||
@@ -168,29 +200,44 @@ class InputPanel(QWidget):
|
||||
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent)
|
||||
self._layout = QVBoxLayout(self)
|
||||
self._layout.setSizeConstraint(QLayout.SetFixedSize) # type: ignore
|
||||
self._layout = QHBoxLayout(self)
|
||||
# self._layout.setSizeConstraint(QLayout.SetFixedSize) # type: ignore
|
||||
|
||||
# Scan selection
|
||||
self.scan_sel = ListWidget("scan_sel", "Scan", ["Si", "Rh", "Pt"])
|
||||
self.load_button = Button(label_button="Load Dataset", enabled=True)
|
||||
self.unload_button = Button(label_button="Unload all", enabled=True)
|
||||
self.open_fm_button = Button(label_button="Open in File Manager", enabled=True)
|
||||
|
||||
self._button_layout = QVBoxLayout()
|
||||
self._button_layout.addWidget(self.load_button)
|
||||
self._button_layout.addWidget(self.unload_button)
|
||||
self._button_layout.addWidget(self.open_fm_button)
|
||||
self._button_layout.addStretch()
|
||||
|
||||
# Assemble complete scan selection group
|
||||
self.input_group = Group(
|
||||
"Scan selection", [self.scan_sel, self.load_button, self.unload_button]
|
||||
"Scan selection", [self._button_layout, self.scan_sel], orientation="horizontal"
|
||||
)
|
||||
|
||||
self._layout.addWidget(self.input_group)
|
||||
self._layout.addStretch()
|
||||
# self._layout.addStretch()
|
||||
|
||||
|
||||
class Group(QGroupBox):
|
||||
def __init__(self, label, widgets):
|
||||
def __init__(self, label, objs, orientation="vertical"):
|
||||
super().__init__(label)
|
||||
self.layout = QVBoxLayout(self) # type: ignore
|
||||
for widget in widgets:
|
||||
self.layout.addWidget(widget) # type: ignore
|
||||
if orientation == "vertical":
|
||||
self._layout = QVBoxLayout(self) # type: ignore
|
||||
elif orientation == "horizontal": # assume horizontal
|
||||
self._layout = QHBoxLayout(self) # type: ignore
|
||||
else:
|
||||
raise ValueError(f"Orientation {orientation} is not supported!")
|
||||
for obj in objs:
|
||||
if isinstance(obj, QWidget):
|
||||
self._layout.addWidget(obj) # type: ignore
|
||||
elif isinstance(obj, QLayout):
|
||||
self._layout.addLayout(obj)
|
||||
|
||||
|
||||
class ListWidget(QWidget):
|
||||
@@ -206,7 +253,7 @@ class ListWidget(QWidget):
|
||||
# self.label.setWordWrap(True)
|
||||
# layout.addWidget(self.label)
|
||||
self.value = TaggedListWidget()
|
||||
self.value.setFixedWidth(400)
|
||||
# self.value.setFixedWidth(400)
|
||||
# for entry in enums:
|
||||
# self.value.addItem(entry)
|
||||
layout.addWidget(self.value)
|
||||
|
||||
@@ -2,15 +2,23 @@
|
||||
HDF5 Viewer — qtpy + pyqtgraph + h5py
|
||||
"""
|
||||
|
||||
from typing import Literal, Optional, cast
|
||||
|
||||
import h5py
|
||||
import numpy as np
|
||||
import pyqtgraph as pg
|
||||
from bec_lib import bec_logger
|
||||
from bec_qthemes import material_icon
|
||||
from bec_widgets.utils.colors import Colors, get_accent_colors
|
||||
from qtpy.QtCore import Qt
|
||||
from qtpy.QtGui import QColor, QFont
|
||||
|
||||
# pylint: disable=E0611
|
||||
from qtpy.QtGui import QBrush, QColor, QFont
|
||||
|
||||
# pylint: disable=E0611
|
||||
from qtpy.QtWidgets import (
|
||||
QAbstractItemView,
|
||||
QApplication,
|
||||
QGroupBox,
|
||||
QHBoxLayout,
|
||||
QHeaderView,
|
||||
@@ -30,109 +38,8 @@ from qtpy.QtWidgets import (
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
# ── Palette ────────────────────────────────────────────────────────────────
|
||||
ACCENT = "#7C9CF5"
|
||||
TEXT_SEC = "#8A8FA8"
|
||||
SUCCESS = "#5FCEA8"
|
||||
|
||||
# pyqtgraph color tuples (R, G, B)
|
||||
PG_BG = (28, 30, 38)
|
||||
PG_PLOT_BG = (45, 48, 64)
|
||||
PG_ACCENT = (124, 156, 245)
|
||||
PG_GRID = (58, 61, 80)
|
||||
PG_TEXT = (138, 143, 168)
|
||||
|
||||
ICON_SIZE = 20
|
||||
|
||||
# ── pyqtgraph global config ────────────────────────────────────────────────
|
||||
pg.setConfigOptions(
|
||||
background=pg.mkColor(*PG_BG),
|
||||
foreground=pg.mkColor(*PG_TEXT),
|
||||
antialias=True,
|
||||
useOpenGL=False, # safer default; set True for large datasets if GPU available
|
||||
)
|
||||
|
||||
|
||||
def _styled_plot_widget(**kwargs) -> pg.PlotWidget:
|
||||
"""Return a PlotWidget already themed to match the dark palette."""
|
||||
pw = pg.PlotWidget(**kwargs)
|
||||
# pw.setBackground(pg.mkColor(*PG_PLOT_BG))
|
||||
ax = pw.getPlotItem()
|
||||
for side in ("left", "bottom", "right", "top"):
|
||||
ax.getAxis(side).setPen(pg.mkPen(color=PG_GRID, width=1))
|
||||
ax.getAxis(side).setTextPen(pg.mkPen(color=PG_TEXT))
|
||||
ax.showGrid(x=True, y=True, alpha=0.25)
|
||||
return pw
|
||||
|
||||
|
||||
# ── HDF5 Tree helpers ──────────────────────────────────────────────────────
|
||||
# TYPE_ICONS = {"group": "📂", "dataset": "📊"}
|
||||
|
||||
|
||||
def dtype_tag(ds):
|
||||
d = ds.dtype
|
||||
if np.issubdtype(d, np.integer):
|
||||
return "int"
|
||||
if np.issubdtype(d, np.floating):
|
||||
return "float"
|
||||
if np.issubdtype(d, np.complexfloating):
|
||||
return "complex"
|
||||
if d.kind in ("S", "U", "O"):
|
||||
return "str"
|
||||
return str(d)
|
||||
|
||||
|
||||
def populate_tree(parent_item, h5_obj):
|
||||
folder_icon = material_icon("folder", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
vector_icon = material_icon("show_chart", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
array_icon = material_icon("stacked_line_chart", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
scalar_icon = material_icon("point_scan", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
str_icon = material_icon("text_snippet", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
|
||||
if not isinstance(h5_obj, h5py.Group):
|
||||
return
|
||||
|
||||
for key in h5_obj.keys():
|
||||
try:
|
||||
child = h5_obj[key]
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if isinstance(child, h5py.Group):
|
||||
item = QTreeWidgetItem(parent_item, [key, "Group", ""])
|
||||
item.setIcon(0, folder_icon)
|
||||
|
||||
item.setData(0, Qt.UserRole, child.name)
|
||||
item.setData(0, Qt.UserRole + 1, "group")
|
||||
|
||||
item.setForeground(0, QColor(ACCENT))
|
||||
|
||||
populate_tree(item, child)
|
||||
|
||||
elif isinstance(child, h5py.Dataset):
|
||||
shape_str = "x".join(str(s) for s in child.shape) or "scalar"
|
||||
|
||||
dtype = dtype_tag(child)
|
||||
|
||||
if shape_str == "scalar":
|
||||
if dtype == "str":
|
||||
icon = str_icon
|
||||
else:
|
||||
icon = scalar_icon
|
||||
elif "x" in shape_str:
|
||||
icon = array_icon
|
||||
else:
|
||||
icon = vector_icon
|
||||
|
||||
item = QTreeWidgetItem(parent_item, [key, dtype, shape_str])
|
||||
item.setIcon(0, icon)
|
||||
|
||||
item.setData(0, Qt.UserRole, child.name)
|
||||
item.setData(0, Qt.UserRole + 1, "dataset")
|
||||
|
||||
item.setForeground(1, QColor(SUCCESS))
|
||||
item.setForeground(2, QColor(TEXT_SEC))
|
||||
|
||||
|
||||
# ── Data / Plot panel ──────────────────────────────────────────────────────
|
||||
class DataPanel(QWidget):
|
||||
@@ -171,8 +78,11 @@ class DataPanel(QWidget):
|
||||
self.stack_layout.setContentsMargins(0, 0, 0, 0)
|
||||
self._layout.addWidget(self.stack, 1)
|
||||
|
||||
self.plot_widget = None
|
||||
self.image_widget = None
|
||||
|
||||
self._current_data = None
|
||||
self._show_empty()
|
||||
self.show_empty()
|
||||
|
||||
# ── helpers ───────────────────────────────────────────────────────────
|
||||
def _clear_stack(self):
|
||||
@@ -180,12 +90,14 @@ class DataPanel(QWidget):
|
||||
item = self.stack_layout.takeAt(0)
|
||||
if item.widget():
|
||||
item.widget().deleteLater()
|
||||
self.plot_widget = None
|
||||
self.image_widget = None
|
||||
|
||||
def _show_empty(self):
|
||||
def show_empty(self):
|
||||
self._clear_stack()
|
||||
lbl = QLabel("No data selected")
|
||||
lbl.setObjectName("info_label")
|
||||
lbl.setAlignment(Qt.AlignCenter)
|
||||
lbl.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
self.stack_layout.addWidget(lbl)
|
||||
|
||||
def _on_mode_change(self):
|
||||
@@ -222,9 +134,7 @@ class DataPanel(QWidget):
|
||||
else:
|
||||
mode = "table"
|
||||
|
||||
if (
|
||||
mode == "plot"
|
||||
): # TODO disable plot and image if data is str, maybe completely disable manual mode and only allow image/plot for arrays
|
||||
if mode == "plot":
|
||||
self._show_plot_1d(data)
|
||||
elif mode == "image":
|
||||
self._show_image_2d(data)
|
||||
@@ -245,47 +155,44 @@ class DataPanel(QWidget):
|
||||
|
||||
x = np.arange(row_data.size, dtype=np.float32)
|
||||
|
||||
pw = _styled_plot_widget()
|
||||
pw.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
|
||||
self.plot_widget = pg.PlotWidget()
|
||||
plot_item = self.plot_widget.getPlotItem()
|
||||
assert plot_item is not None, "PlotWidget has no PlotItem"
|
||||
plot_item.showGrid(x=True, y=True, alpha=0.25)
|
||||
self.plot_widget.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
|
||||
|
||||
plot_item = pw.getPlotItem()
|
||||
plot_item.setAutoVisible(y=False)
|
||||
plot_item.setAutoVisible(y=False) # type: ignore[attr-defined]
|
||||
|
||||
curve = pg.PlotDataItem(
|
||||
x, row_data, pen=pg.mkPen(color=PG_ACCENT, width=1.6), antialias=False
|
||||
x, row_data, pen=pg.mkPen(color="#2980b9", width=1.6), antialias=False
|
||||
)
|
||||
pw.addItem(curve)
|
||||
self.plot_widget.addItem(curve)
|
||||
|
||||
curve.setDownsampling(auto=True, method="peak")
|
||||
curve.setClipToView(True)
|
||||
curve.setSkipFiniteCheck(True)
|
||||
|
||||
# plot_item.setLabel("bottom", "index")
|
||||
# plot_item.setLabel("left", "value")
|
||||
plot_item.enableAutoRange()
|
||||
plot_item.enableAutoRange() # type: ignore[attr-defined]
|
||||
|
||||
if is_2d:
|
||||
# --- Slider ---
|
||||
slider = QSlider(Qt.Vertical)
|
||||
slider = QSlider(Qt.Orientation.Vertical)
|
||||
slider.setMinimum(0)
|
||||
slider.setMaximum(n_rows - 1)
|
||||
slider.setValue(0)
|
||||
# slider.setInvertedAppearance(True) # row 0 at top
|
||||
slider.setFixedWidth(32)
|
||||
# slider.setSingleStep(1)
|
||||
slider.setPageStep(1)
|
||||
|
||||
row_label = QLabel("0")
|
||||
row_label.setAlignment(Qt.AlignCenter)
|
||||
row_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
row_label.setFixedWidth(32)
|
||||
# row_label.setStyleSheet("color: #aaa; font-size: 10px;")
|
||||
|
||||
def on_row_changed(row):
|
||||
row_label.setText(str(row))
|
||||
new_data = data[row].astype(np.float32)
|
||||
new_x = np.arange(new_data.size, dtype=np.float32)
|
||||
curve.setData(new_x, new_data)
|
||||
plot_item.enableAutoRange()
|
||||
plot_item.enableAutoRange() # type: ignore[attr-defined]
|
||||
|
||||
slider.valueChanged.connect(on_row_changed)
|
||||
|
||||
@@ -302,11 +209,45 @@ class DataPanel(QWidget):
|
||||
h_layout.setContentsMargins(0, 0, 0, 0)
|
||||
h_layout.setSpacing(4)
|
||||
h_layout.addWidget(slider_col)
|
||||
h_layout.addWidget(pw)
|
||||
h_layout.addWidget(self.plot_widget)
|
||||
|
||||
self.stack_layout.addWidget(container)
|
||||
else:
|
||||
self.stack_layout.addWidget(pw)
|
||||
self.stack_layout.addWidget(self.plot_widget)
|
||||
|
||||
self.apply_theme()
|
||||
|
||||
def apply_theme(self, theme: Optional[Literal["dark", "light"]] = None):
|
||||
"""
|
||||
Apply the theme
|
||||
|
||||
Args:
|
||||
theme (Optional[str]): Theme, either "dark", "light", or None. Defaults to None.
|
||||
"""
|
||||
if theme is None:
|
||||
app = QApplication.instance()
|
||||
theme = app.theme.theme # type: ignore
|
||||
|
||||
bg_color = pg.getConfigOption("background")
|
||||
fg_color = pg.getConfigOption("foreground")
|
||||
if self.plot_widget is not None:
|
||||
n_curves = len(self.plot_widget.listDataItems())
|
||||
colors = Colors.golden_angle_color(
|
||||
colormap="plasma", num=max(10, n_curves + 1), format="HEX"
|
||||
)
|
||||
for idx, curve in enumerate(self.plot_widget.listDataItems()):
|
||||
curve.setPen(pg.mkPen(color=colors[idx]))
|
||||
# Background
|
||||
self.plot_widget.setBackground(bg_color)
|
||||
# Axes (tick marks, tick labels, axis line)
|
||||
for axis in ["left", "bottom", "right", "top"]:
|
||||
ax = self.plot_widget.getAxis(axis)
|
||||
ax.setPen(pg.mkPen(color=fg_color))
|
||||
ax.setTextPen(pg.mkPen(color=fg_color))
|
||||
|
||||
if self.image_widget is not None:
|
||||
self.image_widget.getView().setBackgroundColor(bg_color)
|
||||
self.image_widget.ui.histogram.setBackground(bg_color)
|
||||
|
||||
# ── 2-D image ──────────────────────────────────────────────────────────
|
||||
def _show_image_2d(self, data):
|
||||
@@ -320,20 +261,20 @@ class DataPanel(QWidget):
|
||||
img_data = np.abs(squeezed) if np.iscomplexobj(squeezed) else squeezed.astype(float)
|
||||
|
||||
# ImageView gives us colorbar + histogram + zoom for free
|
||||
iv = pg.ImageView()
|
||||
iv.getView().setBackgroundColor(pg.mkColor(*PG_BG))
|
||||
iv.ui.histogram.setBackground(pg.mkColor(*PG_PLOT_BG))
|
||||
iv.ui.roiBtn.hide()
|
||||
iv.ui.menuBtn.hide()
|
||||
self.image_widget = pg.ImageView()
|
||||
self.image_widget.ui.roiBtn.hide()
|
||||
self.image_widget.ui.menuBtn.hide()
|
||||
|
||||
# Use 'inferno'-like LUT
|
||||
iv.setColorMap(pg.colormap.get("inferno", source="matplotlib"))
|
||||
self.image_widget.setColorMap(pg.colormap.get("inferno", source="matplotlib"))
|
||||
|
||||
# pyqtgraph ImageView expects (cols, rows) — transpose so row 0 is at top
|
||||
iv.setImage(img_data.T, autoLevels=True, autoHistogramRange=True)
|
||||
iv.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
|
||||
self.image_widget.setImage(img_data.T, autoLevels=True, autoHistogramRange=True)
|
||||
self.image_widget.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
|
||||
|
||||
self.stack_layout.addWidget(iv)
|
||||
self.stack_layout.addWidget(self.image_widget)
|
||||
|
||||
self.apply_theme()
|
||||
|
||||
# ── Table ──────────────────────────────────────────────────────────────
|
||||
def _show_table(self, data):
|
||||
@@ -357,13 +298,13 @@ class DataPanel(QWidget):
|
||||
if rows > MAX_ROWS or cols > MAX_COLS:
|
||||
note = QLabel(f"⚠ Showing {show_rows}/{rows} rows × {show_cols}/{cols} cols")
|
||||
note.setObjectName("info_label")
|
||||
note.setAlignment(Qt.AlignCenter)
|
||||
note.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
self.stack_layout.addWidget(note)
|
||||
|
||||
table = QTableWidget(show_rows, show_cols)
|
||||
table.setEditTriggers(QAbstractItemView.NoEditTriggers)
|
||||
table.setSelectionMode(QAbstractItemView.ContiguousSelection)
|
||||
table.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
|
||||
table.setEditTriggers(QAbstractItemView.EditTrigger.NoEditTriggers)
|
||||
table.setSelectionMode(QAbstractItemView.SelectionMode.ContiguousSelection)
|
||||
table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Interactive)
|
||||
table.horizontalHeader().setDefaultSectionSize(90)
|
||||
table.verticalHeader().setDefaultSectionSize(22)
|
||||
table.setFont(QFont("JetBrains Mono, Consolas, monospace", 10))
|
||||
@@ -381,7 +322,7 @@ class DataPanel(QWidget):
|
||||
else f"{val:.4g}" if is_complex else str(val.decode()) if is_bytes else str(val)
|
||||
)
|
||||
cell = QTableWidgetItem(txt)
|
||||
cell.setTextAlignment(Qt.AlignRight | Qt.AlignVCenter)
|
||||
cell.setTextAlignment(Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)
|
||||
table.setItem(r, c, cell)
|
||||
|
||||
self.stack_layout.addWidget(table)
|
||||
@@ -396,6 +337,15 @@ class HDF5Viewer(QMainWindow):
|
||||
if filepath:
|
||||
self.load_files([filepath])
|
||||
|
||||
def apply_theme(self, theme: Literal["dark", "light"]):
|
||||
"""
|
||||
Apply the theme
|
||||
|
||||
Args:
|
||||
theme (str): Theme, either "dark" or "light"
|
||||
"""
|
||||
self.data_panel.apply_theme(theme)
|
||||
|
||||
def load_files(self, filepaths: list[str]):
|
||||
"""Open one or more HDF5 files and add each as a top-level tree node."""
|
||||
for f in filepaths:
|
||||
@@ -410,7 +360,7 @@ class HDF5Viewer(QMainWindow):
|
||||
|
||||
root_layout = QHBoxLayout(central)
|
||||
|
||||
splitter = QSplitter(Qt.Horizontal)
|
||||
splitter = QSplitter(Qt.Orientation.Horizontal)
|
||||
splitter.setChildrenCollapsible(False)
|
||||
|
||||
# ── Left pane ──
|
||||
@@ -418,11 +368,12 @@ class HDF5Viewer(QMainWindow):
|
||||
left_layout = QVBoxLayout(left_pane)
|
||||
|
||||
self.tree = QTreeWidget()
|
||||
self.tree.setMinimumWidth(250)
|
||||
self.tree.setHeaderLabels(["Name", "Type", "Shape"])
|
||||
self.tree.header().setStretchLastSection(False)
|
||||
self.tree.header().setSectionResizeMode(0, QHeaderView.Stretch)
|
||||
self.tree.header().setSectionResizeMode(1, QHeaderView.ResizeToContents)
|
||||
self.tree.header().setSectionResizeMode(2, QHeaderView.ResizeToContents)
|
||||
self.tree.header().setSectionResizeMode(0, QHeaderView.ResizeMode.Stretch)
|
||||
self.tree.header().setSectionResizeMode(1, QHeaderView.ResizeMode.ResizeToContents)
|
||||
self.tree.header().setSectionResizeMode(2, QHeaderView.ResizeMode.ResizeToContents)
|
||||
self.tree.itemClicked.connect(self._on_item_clicked)
|
||||
|
||||
left_layout.addWidget(self.tree, 1)
|
||||
@@ -446,15 +397,16 @@ class HDF5Viewer(QMainWindow):
|
||||
h5file = self.h5files[filepath]
|
||||
filename = filepath.split("/")[-1]
|
||||
|
||||
dataset_icon = material_icon("dataset", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
dataset_icon = material_icon(
|
||||
"dataset", size=(ICON_SIZE, ICON_SIZE), color=get_accent_colors().default.name()
|
||||
)
|
||||
root_item = QTreeWidgetItem(self.tree, [filename, "Group", ""])
|
||||
root_item.setIcon(0, dataset_icon)
|
||||
root_item.setData(0, Qt.UserRole, "/")
|
||||
root_item.setData(0, Qt.UserRole + 1, "group")
|
||||
root_item.setData(0, Qt.UserRole + 2, filepath) # so clicks know which file
|
||||
root_item.setForeground(0, QColor(ACCENT))
|
||||
root_item.setData(0, Qt.ItemDataRole.UserRole, "/")
|
||||
root_item.setData(0, Qt.ItemDataRole.UserRole + 1, "group")
|
||||
root_item.setData(0, Qt.ItemDataRole.UserRole + 2, filepath) # so clicks know which file
|
||||
|
||||
populate_tree(root_item, h5file)
|
||||
self.populate_tree(root_item, h5file)
|
||||
self.tree.addTopLevelItem(root_item)
|
||||
|
||||
# Expand first 2 levels
|
||||
@@ -464,16 +416,76 @@ class HDF5Viewer(QMainWindow):
|
||||
|
||||
self.tree.setCurrentItem(root_item)
|
||||
|
||||
def populate_tree(self, parent_item, h5_obj):
|
||||
folder_icon = material_icon("folder", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
vector_icon = material_icon("show_chart", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
array_icon = material_icon(
|
||||
"stacked_line_chart", size=(ICON_SIZE, ICON_SIZE), color="#2980b9"
|
||||
)
|
||||
scalar_icon = material_icon("point_scan", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
str_icon = material_icon("text_snippet", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
|
||||
|
||||
if not isinstance(h5_obj, h5py.Group):
|
||||
return
|
||||
|
||||
for key in h5_obj.keys():
|
||||
try:
|
||||
child = h5_obj[key]
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if isinstance(child, h5py.Group):
|
||||
item = QTreeWidgetItem(parent_item, [key, "Group", ""])
|
||||
item.setIcon(0, folder_icon)
|
||||
|
||||
item.setData(0, Qt.ItemDataRole.UserRole, child.name)
|
||||
item.setData(0, Qt.ItemDataRole.UserRole + 1, "group")
|
||||
|
||||
self.populate_tree(item, child)
|
||||
|
||||
elif isinstance(child, h5py.Dataset):
|
||||
shape_str = "x".join(str(s) for s in child.shape) or "scalar"
|
||||
|
||||
d = child.dtype
|
||||
dtype = "unknown"
|
||||
if np.issubdtype(d, np.integer):
|
||||
dtype = "int"
|
||||
if np.issubdtype(d, np.floating):
|
||||
dtype = "float"
|
||||
if np.issubdtype(d, np.complexfloating):
|
||||
dtype = "complex"
|
||||
if d.kind in ("S", "U", "O"):
|
||||
dtype = "str"
|
||||
|
||||
if shape_str == "scalar":
|
||||
if dtype == "str":
|
||||
icon = str_icon
|
||||
else:
|
||||
icon = scalar_icon
|
||||
elif "x" in shape_str:
|
||||
icon = array_icon
|
||||
else:
|
||||
icon = vector_icon
|
||||
|
||||
item = QTreeWidgetItem(parent_item, [key, dtype, shape_str])
|
||||
item.setIcon(0, icon)
|
||||
|
||||
item.setData(0, Qt.ItemDataRole.UserRole, child.name)
|
||||
item.setData(0, Qt.ItemDataRole.UserRole + 1, "dataset")
|
||||
|
||||
item.setForeground(1, QBrush(QColor(get_accent_colors().success.name())))
|
||||
item.setForeground(2, QBrush(QColor("#656365")))
|
||||
|
||||
def _get_filepath_for_item(self, item: QTreeWidgetItem) -> str:
|
||||
"""Walk up the tree to find the filepath stored on the root node."""
|
||||
node = item
|
||||
while node.parent():
|
||||
node = node.parent()
|
||||
return node.data(0, Qt.UserRole + 2)
|
||||
return node.data(0, Qt.ItemDataRole.UserRole + 2)
|
||||
|
||||
def _on_item_clicked(self, item, _col):
|
||||
path = item.data(0, Qt.UserRole)
|
||||
kind = item.data(0, Qt.UserRole + 1)
|
||||
path = item.data(0, Qt.ItemDataRole.UserRole)
|
||||
kind = item.data(0, Qt.ItemDataRole.UserRole + 1)
|
||||
filepath = self._get_filepath_for_item(item)
|
||||
|
||||
if not path or not filepath:
|
||||
@@ -494,7 +506,7 @@ class HDF5Viewer(QMainWindow):
|
||||
h5file.close()
|
||||
self.h5files.clear()
|
||||
self.tree.clear()
|
||||
self.data_panel._show_empty() # TODO: If it works, make function public!
|
||||
self.data_panel.show_empty()
|
||||
|
||||
def closeEvent(self, event):
|
||||
for h5file in self.h5files.values():
|
||||
|
||||
@@ -9,7 +9,4 @@ designer_plugins = {
|
||||
"DigitalTwin": ("debye_bec.bec_widgets.widgets.digital_twin.digital_twin", "DigitalTwin"),
|
||||
}
|
||||
|
||||
widget_icons = {
|
||||
"DataViewer": "lightbulb",
|
||||
"DigitalTwin": "lightbulb",
|
||||
}
|
||||
widget_icons = {"DataViewer": "find_in_page", "DigitalTwin": "lightbulb"}
|
||||
|
||||
@@ -256,6 +256,7 @@ def fm_ideal_pitch(
|
||||
Returns:
|
||||
tuple[float, float | None]: Pitch of mirror in rad, qy in mm
|
||||
"""
|
||||
p_cm = bl.cm.center[1] # posCM
|
||||
p = bl.fm.center[1] # posFM
|
||||
q = smpl - bl.fm.center[1] # dist posFM to posEX
|
||||
if fm_focus == "Defocused":
|
||||
@@ -264,14 +265,25 @@ def fm_ideal_pitch(
|
||||
assert fm_focx is not None, "fm_focx must be provided for Defocused mode"
|
||||
assert fm_focy is not None, "fm_focy must be provided for Defocused mode"
|
||||
a = 2 * np.tan(sldi_hacc) * bl.fm.center[1] # Beam width at focusing mirror
|
||||
# logger.info(f"a: {a}")
|
||||
# logger.info(f"sldi_hacc: {sldi_hacc}")
|
||||
# logger.info(f"bl.fm.center[1]: {bl.fm.center[1]}")
|
||||
# logger.info(f"p: {p}")
|
||||
# logger.info(f"q: {q}")
|
||||
b = (
|
||||
2 * np.tan(sldi_vacc) * bl.cm.center[1]
|
||||
) # Beam height at focusing mirror (collimated beam)
|
||||
x = fm_focx
|
||||
# logger.info(f"x: {x}")
|
||||
x = 0.098821 * x**2 + 0.512344 * x # polynom to correct for spot size
|
||||
# logger.info(f"x (corrected): {x}")
|
||||
y = fm_focy
|
||||
y = 3.183562 * y**2 + 1.258364 * y # polynom to correct for spot size
|
||||
qx = q + x * p / a
|
||||
qy = q + y * p / b
|
||||
qy = q + y * p_cm / b
|
||||
f = (p * qx) / (p + qx) # focal length
|
||||
# logger.info(f"qx: {qx}")
|
||||
# logger.info(f"f: {f}")
|
||||
else: # Calculate for focused beam on sample in "manual" and "focused" mode
|
||||
qy = None
|
||||
f = (p * q) / (p + q) # focal length
|
||||
@@ -280,6 +292,7 @@ def fm_ideal_pitch(
|
||||
pitch = np.arcsin(bl.fm.r[0] / (2 * f)) # ideal pitch for FM
|
||||
if "Pt" in fm_stripe:
|
||||
pitch = np.arcsin(bl.fm.r[1] / (2 * f)) # ideal pitch for FM
|
||||
# logger.info(f"pitch: {pitch}")
|
||||
return pitch, qy
|
||||
|
||||
|
||||
|
||||
@@ -65,6 +65,7 @@ logger = bec_logger.logger
|
||||
OFFSET_FILE_X01DA = Path(__file__).with_name("x01da_offsets.yaml")
|
||||
OFFSET_FILE_X10DA = Path(__file__).with_name("x10da_offsets.yaml")
|
||||
|
||||
|
||||
class DigitalTwin(BECWidget, QWidget):
|
||||
"""
|
||||
Main widget of Digital Twin
|
||||
@@ -224,12 +225,12 @@ class DigitalTwin(BECWidget, QWidget):
|
||||
if choice in ["yes", "y"]:
|
||||
bl = input(f"Choose from: {[bl.value for bl in BeamlineId]}")
|
||||
if bl in BeamlineId:
|
||||
logger.info(f'Manually selected beamline {bl}')
|
||||
logger.info(f"Manually selected beamline {bl}")
|
||||
return BeamlineId(bl)
|
||||
else:
|
||||
raise ValueError(f'Wrong selection {bl}')
|
||||
raise ValueError(f"Wrong selection {bl}")
|
||||
else:
|
||||
raise ValueError('Cannot open digital twin without a beamline')
|
||||
raise ValueError("Cannot open digital twin without a beamline")
|
||||
|
||||
@SafeSlot()
|
||||
def check_bec_config(self, *args):
|
||||
@@ -362,6 +363,10 @@ class DigitalTwin(BECWidget, QWidget):
|
||||
self.calc_fm_reflectivity()
|
||||
self.calc_cm_fm_harm_suppr()
|
||||
self.calc_mo1_energy_resolution()
|
||||
case "hacc":
|
||||
self.calc_fm_ideal_pitch()
|
||||
case "vacc":
|
||||
self.calc_fm_ideal_pitch()
|
||||
case "cm_stripe":
|
||||
self.calc_cm_crit_pitch()
|
||||
self.calc_cm_reflectivity()
|
||||
@@ -672,7 +677,7 @@ class DigitalTwin(BECWidget, QWidget):
|
||||
# Load offsets
|
||||
if not self.offset_file.exists():
|
||||
raise FileNotFoundError(f"Offset file not found: {self.offset_file}")
|
||||
|
||||
|
||||
with self.offset_file.open("r", encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
|
||||
@@ -5,43 +5,43 @@ mo1_trx:
|
||||
modifier:
|
||||
axis: mo1_trx
|
||||
range: [[-30, -0.1], [0.1, 30]]
|
||||
offset: [0, 2.21]
|
||||
offset: [-2.3, 1.31]
|
||||
|
||||
mo1_try:
|
||||
modifier:
|
||||
axis: mo1_trx
|
||||
range: [[-30, -0.1], [0.1, 30]]
|
||||
offset: [0, -1.6]
|
||||
offset: [-1.78, -1.78]
|
||||
|
||||
sl1_centery:
|
||||
offset: -1.8
|
||||
offset: -1.2
|
||||
|
||||
fm_trx:
|
||||
modifier:
|
||||
axis: fm_trx
|
||||
range: [[-66, -31], [-24, 7], [11, 31], [38, 66]]
|
||||
offset: [0, 0, 0, -0.16]
|
||||
offset: [-0.61, 0, 0, -0.16]
|
||||
|
||||
fm_try:
|
||||
modifier:
|
||||
axis: fm_trx
|
||||
range: [[-66, -31], [-24, 7], [11, 31], [38, 66]]
|
||||
offset: [0, 0, 0, -0.45]
|
||||
offset: [0.028, 0, 0, -0.45]
|
||||
|
||||
fm_rotx:
|
||||
modifier:
|
||||
axis: fm_trx
|
||||
range: [[-66, -31], [-24, 7], [11, 31], [38, 66]]
|
||||
offset: [0, 0, 0, 0.063]
|
||||
offset: [0.027, 0, 0, 0.045]
|
||||
|
||||
fm_roty:
|
||||
modifier:
|
||||
axis: fm_trx
|
||||
range: [[-66, -31], [-24, 7], [11, 31], [38, 66]]
|
||||
offset: [0, 0, 0, -0.04]
|
||||
offset: [-0.038, 0, 0, -0.043]
|
||||
|
||||
sl2_centery:
|
||||
offset: 1.2
|
||||
offset: -0.7
|
||||
|
||||
ot_try:
|
||||
offset: 0
|
||||
|
||||
@@ -70,10 +70,9 @@ xas_config:
|
||||
#xrd_config:
|
||||
# - !include ./x01da_xrd.yaml
|
||||
|
||||
# Commented out because too slow
|
||||
## Hutch cameras
|
||||
# hutch_cams:
|
||||
# - !include ./x01da_hutch_cameras.yaml
|
||||
hutch_cams:
|
||||
- !include ./x01da_hutch_cameras.yaml
|
||||
|
||||
## Remaining experimental hutch
|
||||
es_config:
|
||||
|
||||
@@ -6,9 +6,13 @@ import threading
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import requests
|
||||
from bec_lib.file_utils import get_full_path
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Kind, Signal
|
||||
from ophyd_devices import DeviceStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
@@ -16,7 +20,6 @@ from debye_bec.devices.utils.utils import fetch_scan_info
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_lib.messages import ScanStatusMessage
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -28,59 +31,76 @@ CAM_PORT = 554
|
||||
class HutchCam(PSIDeviceBase):
|
||||
"""Class for the Hutch Cameras"""
|
||||
|
||||
# image = Cpt(Signal, name='image', kind='config')
|
||||
USER_ACCESS = ["acquire_from_image"]
|
||||
|
||||
image = Cpt(Signal, name="image", kind=Kind.config)
|
||||
|
||||
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||
super().__init__(name=name, scan_info=scan_info, **kwargs)
|
||||
self.scan_parameters: ScanServerScanInfo = None
|
||||
self.hostname = prefix
|
||||
self.status = None
|
||||
self.name = ""
|
||||
|
||||
# pylint: disable=E1101
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
Retrieve camera name which also makes sure the camera is connected.
|
||||
"""
|
||||
rtsp_url = f"rtsp://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch:{CAM_PORT}/rtpstream/config1"
|
||||
cap = cv2.VideoCapture(f"{rtsp_url}?tcp")
|
||||
if not cap.isOpened():
|
||||
logger.error(self, "Connection Failed", "Could not connect to the camera stream.")
|
||||
return
|
||||
cap.release()
|
||||
info_url = f"http://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch/-wvhttp-01-/info.cgi?item=c.1.name"
|
||||
response = requests.get(info_url, timeout=5)
|
||||
response.raise_for_status()
|
||||
self.name = response.content.decode("utf-8").split("c.1.name.utf8:=")[-1].strip().lower()
|
||||
|
||||
def on_stage(self) -> DeviceStatus:
|
||||
"""Called while staging the device."""
|
||||
self.scan_parameters = fetch_scan_info(self.scan_info)
|
||||
file_path = get_full_path(self.scan_info, name="hutch_cam_" + self.hostname).removesuffix(
|
||||
"h5"
|
||||
)
|
||||
|
||||
self.status = DeviceStatus(self)
|
||||
file_path = get_full_path(self.scan_info.msg, name=self.name).removesuffix("h5")
|
||||
|
||||
thread = threading.Thread(
|
||||
target=self._save_picture, args=(file_path, self.status), daemon=True
|
||||
target=self.acquire_and_save_from_video, args=(file_path,), daemon=True
|
||||
)
|
||||
thread.start()
|
||||
|
||||
return self.status
|
||||
return None
|
||||
|
||||
def _save_picture(self, file_path, status):
|
||||
def acquire_from_image(self):
|
||||
"""
|
||||
Acquire an image from the image endpoint of the camera. Resolution is
|
||||
1280 x 720 px and cannot be changed. Acquisition is fast.
|
||||
"""
|
||||
logger.debug(f"Capture from camera {self.hostname}")
|
||||
image_url = (
|
||||
f"http://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch/-wvhttp-01-/image.cgi"
|
||||
)
|
||||
response = requests.get(image_url, timeout=5)
|
||||
response.raise_for_status()
|
||||
img = cv2.imdecode(
|
||||
np.frombuffer(response.content, np.uint8), cv2.IMREAD_COLOR
|
||||
) # Or IMREAD_GRAYSCALE
|
||||
self.image.put(img)
|
||||
|
||||
def acquire_and_save_from_video(self, file_path):
|
||||
"""
|
||||
Acquire an image from a videostream. Videostream resolution can be changed in the camera settings.
|
||||
Acquisition is slow, as opening the video stream can take a few seconds.
|
||||
|
||||
Args:
|
||||
file_path (str): File path including filename where image will be saved.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Capture from camera {self.hostname}")
|
||||
logger.debug(f"Capture from camera {self.hostname}")
|
||||
rtsp_url = f"rtsp://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch:{CAM_PORT}/rtpstream/config1"
|
||||
cap = cv2.VideoCapture(f"{rtsp_url}?tcp")
|
||||
if not cap.isOpened():
|
||||
logger.error("Connection Failed", "Could not connect to the camera stream.")
|
||||
return
|
||||
logger.info(f"Connection to camera {self.hostname} established")
|
||||
ret, frame = cap.readAsync()
|
||||
logger.debug(f"Connection to camera {self.hostname} established")
|
||||
ret, frame = cap.read()
|
||||
cap.release()
|
||||
if not ret:
|
||||
logger.error("Capture Failed", "Failed to capture image from camera.")
|
||||
return
|
||||
cv2.imwrite(file_path + "png", frame)
|
||||
status.set_finished()
|
||||
logger.info(f"Capture from camera {self.hostname} done")
|
||||
logger.debug(f"Capture from camera {self.hostname} done")
|
||||
except Exception as e:
|
||||
status.set_exception(e)
|
||||
logger.error(f"Could not acquire image from video stream: {e}")
|
||||
|
||||
Reference in New Issue
Block a user