widget development and hutch cameras fix #86

Merged
hitz_s merged 9 commits from fix-hutch_cameras into main 2026-06-22 11:43:37 +02:00
8 changed files with 314 additions and 221 deletions
@@ -2,9 +2,12 @@
Data Viewer: Custom BEC widget to view data from scans.
"""
import os
import subprocess
import sys
from datetime import datetime
from functools import partial
from typing import Literal, Optional, cast
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
@@ -36,6 +39,8 @@ from debye_bec.bec_widgets.widgets.data_viewer.viewer import HDF5Viewer
logger = bec_logger.logger
MAX_HIST_LEN = 100
class DataViewer(BECWidget, QWidget):
"""
@@ -43,12 +48,14 @@ class DataViewer(BECWidget, QWidget):
"""
PLUGIN = True
ICON_NAME = "lightbulb"
ICON_NAME = "find_in_page"
def __init__(self, *arg, parent=None, **kwargs):
super().__init__(parent=parent, theme_update=True, *arg, **kwargs)
self.get_bec_shortcuts()
logger.info(f"Type of self.client: {type(self.client)}")
central = QWidget()
self.root_layout = QVBoxLayout(central)
@@ -76,19 +83,44 @@ class DataViewer(BECWidget, QWidget):
self.input.scan_sel.currentItemChanged_connect(self.scan_sel_changed)
self.input.load_button.clicked_connect(self.load_dataset)
self.input.unload_button.clicked_connect(self.unload_all_datasets)
self.input.open_fm_button.clicked_connect(self.open_in_file_manager)
def apply_theme(self, theme: Literal["dark", "light"]):
"""
Apply the theme
Args:
theme (str): Theme, either "dark" or "light"
"""
self.viewer.apply_theme(theme)
self.on_history_update()
@SafeSlot()
def scan_sel_changed(self, *_, **kwargs):
self.current_row = kwargs["value"]().row()
@SafeSlot()
def open_in_file_manager(self, *_):
if len(self.history) > 0:
scan = self.history[self.current_row]
filepath = scan["file_components"][0].decode().rsplit("/", 1)[0]
subprocess.Popen(
["xdg-open", filepath],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
start_new_session=True,
)
@SafeSlot()
def load_dataset(
self, *_
): # TODO: Check scan file components for combined xas/xrd scans. Is the Pilatus file in there as well?
scan = self.history[self.current_row]
file = scan["file_components"][0] + b"_master." + scan["file_components"][1]
logger.info(file.decode())
self.viewer.load_files([file.decode()])
if len(self.history) > 0:
scan = self.history[self.current_row]
file = scan["file_components"][0] + b"_master." + scan["file_components"][1]
logger.info(file.decode())
self.viewer.load_files([file.decode()])
@SafeSlot()
def unload_all_datasets(self, *_):
@@ -121,12 +153,12 @@ class DataViewer(BECWidget, QWidget):
self.input.scan_sel.clear()
# Get the length of the scan history, which is 0 when the bec server was started
# and no scan has finished yet. Limit the history to the latest 20 scans.
max_scans = min(len(self.client.history), 20)
for n in range(1, max_scans): # last scans, up to 20
max_scans = min(len(self.client.history), MAX_HIST_LEN)
for n in range(1, max_scans): # last scans, limited by MAX_HIST_LEN
# logger.info(self.client.history[-n].metadata["bec"]["status"])
start_time = self.client.history[-n].metadata["start_time"]
end_time = self.client.history[-n].metadata["end_time"]
logger.info(type(start_time))
# logger.info(type(start_time))
scan_data = self.client.history[-n].metadata["bec"]
# logger.info(scan_data)
scan_number = scan_data["scan_number"]
@@ -147,20 +179,20 @@ class DataViewer(BECWidget, QWidget):
}
)
tags = []
tags.append((scan_name, "#4A90D9"))
tags.append((scan_name, get_accent_colors().default.name()))
if sample_name != "":
tags.append((sample_name, "#4A10D9"))
tags.append((sample_name, get_accent_colors().highlight.name()))
if comment != "":
tags.append((comment, "#FA10D9"))
tags.append((comment, get_accent_colors().warning.name()))
if status == "closed":
tags.append((status, "#1F7023"))
tags.append((status, get_accent_colors().success.name()))
elif status == "halted":
tags.append((status, "#A33047"))
tags.append((status, get_accent_colors().emergency.name()))
else:
tags.append((status, "#656365"))
tags.append((self.duration_string(start_time, end_time), "#656365"))
self.input.scan_sel.addTaggedItem(label=str(scan_number), tags=tags)
logger.info(f"Scan history: {self.history}")
# logger.info(f"Scan history: {self.history}")
class InputPanel(QWidget):
@@ -168,29 +200,44 @@ class InputPanel(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self._layout = QVBoxLayout(self)
self._layout.setSizeConstraint(QLayout.SetFixedSize) # type: ignore
self._layout = QHBoxLayout(self)
# self._layout.setSizeConstraint(QLayout.SetFixedSize) # type: ignore
# Scan selection
self.scan_sel = ListWidget("scan_sel", "Scan", ["Si", "Rh", "Pt"])
self.load_button = Button(label_button="Load Dataset", enabled=True)
self.unload_button = Button(label_button="Unload all", enabled=True)
self.open_fm_button = Button(label_button="Open in File Manager", enabled=True)
self._button_layout = QVBoxLayout()
self._button_layout.addWidget(self.load_button)
self._button_layout.addWidget(self.unload_button)
self._button_layout.addWidget(self.open_fm_button)
self._button_layout.addStretch()
# Assemble complete scan selection group
self.input_group = Group(
"Scan selection", [self.scan_sel, self.load_button, self.unload_button]
"Scan selection", [self._button_layout, self.scan_sel], orientation="horizontal"
)
self._layout.addWidget(self.input_group)
self._layout.addStretch()
# self._layout.addStretch()
class Group(QGroupBox):
def __init__(self, label, widgets):
def __init__(self, label, objs, orientation="vertical"):
super().__init__(label)
self.layout = QVBoxLayout(self) # type: ignore
for widget in widgets:
self.layout.addWidget(widget) # type: ignore
if orientation == "vertical":
self._layout = QVBoxLayout(self) # type: ignore
elif orientation == "horizontal": # assume horizontal
self._layout = QHBoxLayout(self) # type: ignore
else:
raise ValueError(f"Orientation {orientation} is not supported!")
for obj in objs:
if isinstance(obj, QWidget):
self._layout.addWidget(obj) # type: ignore
elif isinstance(obj, QLayout):
self._layout.addLayout(obj)
class ListWidget(QWidget):
@@ -206,7 +253,7 @@ class ListWidget(QWidget):
# self.label.setWordWrap(True)
# layout.addWidget(self.label)
self.value = TaggedListWidget()
self.value.setFixedWidth(400)
# self.value.setFixedWidth(400)
# for entry in enums:
# self.value.addItem(entry)
layout.addWidget(self.value)
@@ -2,15 +2,23 @@
HDF5 Viewer — qtpy + pyqtgraph + h5py
"""
from typing import Literal, Optional, cast
import h5py
import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger
from bec_qthemes import material_icon
from bec_widgets.utils.colors import Colors, get_accent_colors
from qtpy.QtCore import Qt
from qtpy.QtGui import QColor, QFont
# pylint: disable=E0611
from qtpy.QtGui import QBrush, QColor, QFont
# pylint: disable=E0611
from qtpy.QtWidgets import (
QAbstractItemView,
QApplication,
QGroupBox,
QHBoxLayout,
QHeaderView,
@@ -30,109 +38,8 @@ from qtpy.QtWidgets import (
logger = bec_logger.logger
# ── Palette ────────────────────────────────────────────────────────────────
ACCENT = "#7C9CF5"
TEXT_SEC = "#8A8FA8"
SUCCESS = "#5FCEA8"
# pyqtgraph color tuples (R, G, B)
PG_BG = (28, 30, 38)
PG_PLOT_BG = (45, 48, 64)
PG_ACCENT = (124, 156, 245)
PG_GRID = (58, 61, 80)
PG_TEXT = (138, 143, 168)
ICON_SIZE = 20
# ── pyqtgraph global config ────────────────────────────────────────────────
pg.setConfigOptions(
background=pg.mkColor(*PG_BG),
foreground=pg.mkColor(*PG_TEXT),
antialias=True,
useOpenGL=False, # safer default; set True for large datasets if GPU available
)
def _styled_plot_widget(**kwargs) -> pg.PlotWidget:
"""Return a PlotWidget already themed to match the dark palette."""
pw = pg.PlotWidget(**kwargs)
# pw.setBackground(pg.mkColor(*PG_PLOT_BG))
ax = pw.getPlotItem()
for side in ("left", "bottom", "right", "top"):
ax.getAxis(side).setPen(pg.mkPen(color=PG_GRID, width=1))
ax.getAxis(side).setTextPen(pg.mkPen(color=PG_TEXT))
ax.showGrid(x=True, y=True, alpha=0.25)
return pw
# ── HDF5 Tree helpers ──────────────────────────────────────────────────────
# TYPE_ICONS = {"group": "📂", "dataset": "📊"}
def dtype_tag(ds):
d = ds.dtype
if np.issubdtype(d, np.integer):
return "int"
if np.issubdtype(d, np.floating):
return "float"
if np.issubdtype(d, np.complexfloating):
return "complex"
if d.kind in ("S", "U", "O"):
return "str"
return str(d)
def populate_tree(parent_item, h5_obj):
folder_icon = material_icon("folder", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
vector_icon = material_icon("show_chart", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
array_icon = material_icon("stacked_line_chart", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
scalar_icon = material_icon("point_scan", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
str_icon = material_icon("text_snippet", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
if not isinstance(h5_obj, h5py.Group):
return
for key in h5_obj.keys():
try:
child = h5_obj[key]
except Exception:
continue
if isinstance(child, h5py.Group):
item = QTreeWidgetItem(parent_item, [key, "Group", ""])
item.setIcon(0, folder_icon)
item.setData(0, Qt.UserRole, child.name)
item.setData(0, Qt.UserRole + 1, "group")
item.setForeground(0, QColor(ACCENT))
populate_tree(item, child)
elif isinstance(child, h5py.Dataset):
shape_str = "x".join(str(s) for s in child.shape) or "scalar"
dtype = dtype_tag(child)
if shape_str == "scalar":
if dtype == "str":
icon = str_icon
else:
icon = scalar_icon
elif "x" in shape_str:
icon = array_icon
else:
icon = vector_icon
item = QTreeWidgetItem(parent_item, [key, dtype, shape_str])
item.setIcon(0, icon)
item.setData(0, Qt.UserRole, child.name)
item.setData(0, Qt.UserRole + 1, "dataset")
item.setForeground(1, QColor(SUCCESS))
item.setForeground(2, QColor(TEXT_SEC))
# ── Data / Plot panel ──────────────────────────────────────────────────────
class DataPanel(QWidget):
@@ -171,8 +78,11 @@ class DataPanel(QWidget):
self.stack_layout.setContentsMargins(0, 0, 0, 0)
self._layout.addWidget(self.stack, 1)
self.plot_widget = None
self.image_widget = None
self._current_data = None
self._show_empty()
self.show_empty()
# ── helpers ───────────────────────────────────────────────────────────
def _clear_stack(self):
@@ -180,12 +90,14 @@ class DataPanel(QWidget):
item = self.stack_layout.takeAt(0)
if item.widget():
item.widget().deleteLater()
self.plot_widget = None
self.image_widget = None
def _show_empty(self):
def show_empty(self):
self._clear_stack()
lbl = QLabel("No data selected")
lbl.setObjectName("info_label")
lbl.setAlignment(Qt.AlignCenter)
lbl.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.stack_layout.addWidget(lbl)
def _on_mode_change(self):
@@ -222,9 +134,7 @@ class DataPanel(QWidget):
else:
mode = "table"
if (
mode == "plot"
): # TODO disable plot and image if data is str, maybe completely disable manual mode and only allow image/plot for arrays
if mode == "plot":
self._show_plot_1d(data)
elif mode == "image":
self._show_image_2d(data)
@@ -245,47 +155,44 @@ class DataPanel(QWidget):
x = np.arange(row_data.size, dtype=np.float32)
pw = _styled_plot_widget()
pw.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.plot_widget = pg.PlotWidget()
plot_item = self.plot_widget.getPlotItem()
assert plot_item is not None, "PlotWidget has no PlotItem"
plot_item.showGrid(x=True, y=True, alpha=0.25)
self.plot_widget.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
plot_item = pw.getPlotItem()
plot_item.setAutoVisible(y=False)
plot_item.setAutoVisible(y=False) # type: ignore[attr-defined]
curve = pg.PlotDataItem(
x, row_data, pen=pg.mkPen(color=PG_ACCENT, width=1.6), antialias=False
x, row_data, pen=pg.mkPen(color="#2980b9", width=1.6), antialias=False
)
pw.addItem(curve)
self.plot_widget.addItem(curve)
curve.setDownsampling(auto=True, method="peak")
curve.setClipToView(True)
curve.setSkipFiniteCheck(True)
# plot_item.setLabel("bottom", "index")
# plot_item.setLabel("left", "value")
plot_item.enableAutoRange()
plot_item.enableAutoRange() # type: ignore[attr-defined]
if is_2d:
# --- Slider ---
slider = QSlider(Qt.Vertical)
slider = QSlider(Qt.Orientation.Vertical)
slider.setMinimum(0)
slider.setMaximum(n_rows - 1)
slider.setValue(0)
# slider.setInvertedAppearance(True) # row 0 at top
slider.setFixedWidth(32)
# slider.setSingleStep(1)
slider.setPageStep(1)
row_label = QLabel("0")
row_label.setAlignment(Qt.AlignCenter)
row_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
row_label.setFixedWidth(32)
# row_label.setStyleSheet("color: #aaa; font-size: 10px;")
def on_row_changed(row):
row_label.setText(str(row))
new_data = data[row].astype(np.float32)
new_x = np.arange(new_data.size, dtype=np.float32)
curve.setData(new_x, new_data)
plot_item.enableAutoRange()
plot_item.enableAutoRange() # type: ignore[attr-defined]
slider.valueChanged.connect(on_row_changed)
@@ -302,11 +209,45 @@ class DataPanel(QWidget):
h_layout.setContentsMargins(0, 0, 0, 0)
h_layout.setSpacing(4)
h_layout.addWidget(slider_col)
h_layout.addWidget(pw)
h_layout.addWidget(self.plot_widget)
self.stack_layout.addWidget(container)
else:
self.stack_layout.addWidget(pw)
self.stack_layout.addWidget(self.plot_widget)
self.apply_theme()
def apply_theme(self, theme: Optional[Literal["dark", "light"]] = None):
"""
Apply the theme
Args:
theme (Optional[str]): Theme, either "dark", "light", or None. Defaults to None.
"""
if theme is None:
app = QApplication.instance()
theme = app.theme.theme # type: ignore
bg_color = pg.getConfigOption("background")
fg_color = pg.getConfigOption("foreground")
if self.plot_widget is not None:
n_curves = len(self.plot_widget.listDataItems())
colors = Colors.golden_angle_color(
colormap="plasma", num=max(10, n_curves + 1), format="HEX"
)
for idx, curve in enumerate(self.plot_widget.listDataItems()):
curve.setPen(pg.mkPen(color=colors[idx]))
# Background
self.plot_widget.setBackground(bg_color)
# Axes (tick marks, tick labels, axis line)
for axis in ["left", "bottom", "right", "top"]:
ax = self.plot_widget.getAxis(axis)
ax.setPen(pg.mkPen(color=fg_color))
ax.setTextPen(pg.mkPen(color=fg_color))
if self.image_widget is not None:
self.image_widget.getView().setBackgroundColor(bg_color)
self.image_widget.ui.histogram.setBackground(bg_color)
# ── 2-D image ──────────────────────────────────────────────────────────
def _show_image_2d(self, data):
@@ -320,20 +261,20 @@ class DataPanel(QWidget):
img_data = np.abs(squeezed) if np.iscomplexobj(squeezed) else squeezed.astype(float)
# ImageView gives us colorbar + histogram + zoom for free
iv = pg.ImageView()
iv.getView().setBackgroundColor(pg.mkColor(*PG_BG))
iv.ui.histogram.setBackground(pg.mkColor(*PG_PLOT_BG))
iv.ui.roiBtn.hide()
iv.ui.menuBtn.hide()
self.image_widget = pg.ImageView()
self.image_widget.ui.roiBtn.hide()
self.image_widget.ui.menuBtn.hide()
# Use 'inferno'-like LUT
iv.setColorMap(pg.colormap.get("inferno", source="matplotlib"))
self.image_widget.setColorMap(pg.colormap.get("inferno", source="matplotlib"))
# pyqtgraph ImageView expects (cols, rows) — transpose so row 0 is at top
iv.setImage(img_data.T, autoLevels=True, autoHistogramRange=True)
iv.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.image_widget.setImage(img_data.T, autoLevels=True, autoHistogramRange=True)
self.image_widget.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
self.stack_layout.addWidget(iv)
self.stack_layout.addWidget(self.image_widget)
self.apply_theme()
# ── Table ──────────────────────────────────────────────────────────────
def _show_table(self, data):
@@ -357,13 +298,13 @@ class DataPanel(QWidget):
if rows > MAX_ROWS or cols > MAX_COLS:
note = QLabel(f"⚠ Showing {show_rows}/{rows} rows × {show_cols}/{cols} cols")
note.setObjectName("info_label")
note.setAlignment(Qt.AlignCenter)
note.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.stack_layout.addWidget(note)
table = QTableWidget(show_rows, show_cols)
table.setEditTriggers(QAbstractItemView.NoEditTriggers)
table.setSelectionMode(QAbstractItemView.ContiguousSelection)
table.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
table.setEditTriggers(QAbstractItemView.EditTrigger.NoEditTriggers)
table.setSelectionMode(QAbstractItemView.SelectionMode.ContiguousSelection)
table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Interactive)
table.horizontalHeader().setDefaultSectionSize(90)
table.verticalHeader().setDefaultSectionSize(22)
table.setFont(QFont("JetBrains Mono, Consolas, monospace", 10))
@@ -381,7 +322,7 @@ class DataPanel(QWidget):
else f"{val:.4g}" if is_complex else str(val.decode()) if is_bytes else str(val)
)
cell = QTableWidgetItem(txt)
cell.setTextAlignment(Qt.AlignRight | Qt.AlignVCenter)
cell.setTextAlignment(Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)
table.setItem(r, c, cell)
self.stack_layout.addWidget(table)
@@ -396,6 +337,15 @@ class HDF5Viewer(QMainWindow):
if filepath:
self.load_files([filepath])
def apply_theme(self, theme: Literal["dark", "light"]):
"""
Apply the theme
Args:
theme (str): Theme, either "dark" or "light"
"""
self.data_panel.apply_theme(theme)
def load_files(self, filepaths: list[str]):
"""Open one or more HDF5 files and add each as a top-level tree node."""
for f in filepaths:
@@ -410,7 +360,7 @@ class HDF5Viewer(QMainWindow):
root_layout = QHBoxLayout(central)
splitter = QSplitter(Qt.Horizontal)
splitter = QSplitter(Qt.Orientation.Horizontal)
splitter.setChildrenCollapsible(False)
# ── Left pane ──
@@ -418,11 +368,12 @@ class HDF5Viewer(QMainWindow):
left_layout = QVBoxLayout(left_pane)
self.tree = QTreeWidget()
self.tree.setMinimumWidth(250)
self.tree.setHeaderLabels(["Name", "Type", "Shape"])
self.tree.header().setStretchLastSection(False)
self.tree.header().setSectionResizeMode(0, QHeaderView.Stretch)
self.tree.header().setSectionResizeMode(1, QHeaderView.ResizeToContents)
self.tree.header().setSectionResizeMode(2, QHeaderView.ResizeToContents)
self.tree.header().setSectionResizeMode(0, QHeaderView.ResizeMode.Stretch)
self.tree.header().setSectionResizeMode(1, QHeaderView.ResizeMode.ResizeToContents)
self.tree.header().setSectionResizeMode(2, QHeaderView.ResizeMode.ResizeToContents)
self.tree.itemClicked.connect(self._on_item_clicked)
left_layout.addWidget(self.tree, 1)
@@ -446,15 +397,16 @@ class HDF5Viewer(QMainWindow):
h5file = self.h5files[filepath]
filename = filepath.split("/")[-1]
dataset_icon = material_icon("dataset", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
dataset_icon = material_icon(
"dataset", size=(ICON_SIZE, ICON_SIZE), color=get_accent_colors().default.name()
)
root_item = QTreeWidgetItem(self.tree, [filename, "Group", ""])
root_item.setIcon(0, dataset_icon)
root_item.setData(0, Qt.UserRole, "/")
root_item.setData(0, Qt.UserRole + 1, "group")
root_item.setData(0, Qt.UserRole + 2, filepath) # so clicks know which file
root_item.setForeground(0, QColor(ACCENT))
root_item.setData(0, Qt.ItemDataRole.UserRole, "/")
root_item.setData(0, Qt.ItemDataRole.UserRole + 1, "group")
root_item.setData(0, Qt.ItemDataRole.UserRole + 2, filepath) # so clicks know which file
populate_tree(root_item, h5file)
self.populate_tree(root_item, h5file)
self.tree.addTopLevelItem(root_item)
# Expand first 2 levels
@@ -464,16 +416,76 @@ class HDF5Viewer(QMainWindow):
self.tree.setCurrentItem(root_item)
def populate_tree(self, parent_item, h5_obj):
folder_icon = material_icon("folder", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
vector_icon = material_icon("show_chart", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
array_icon = material_icon(
"stacked_line_chart", size=(ICON_SIZE, ICON_SIZE), color="#2980b9"
)
scalar_icon = material_icon("point_scan", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
str_icon = material_icon("text_snippet", size=(ICON_SIZE, ICON_SIZE), color="#2980b9")
if not isinstance(h5_obj, h5py.Group):
return
for key in h5_obj.keys():
try:
child = h5_obj[key]
except Exception:
continue
if isinstance(child, h5py.Group):
item = QTreeWidgetItem(parent_item, [key, "Group", ""])
item.setIcon(0, folder_icon)
item.setData(0, Qt.ItemDataRole.UserRole, child.name)
item.setData(0, Qt.ItemDataRole.UserRole + 1, "group")
self.populate_tree(item, child)
elif isinstance(child, h5py.Dataset):
shape_str = "x".join(str(s) for s in child.shape) or "scalar"
d = child.dtype
dtype = "unknown"
if np.issubdtype(d, np.integer):
dtype = "int"
if np.issubdtype(d, np.floating):
dtype = "float"
if np.issubdtype(d, np.complexfloating):
dtype = "complex"
if d.kind in ("S", "U", "O"):
dtype = "str"
if shape_str == "scalar":
if dtype == "str":
icon = str_icon
else:
icon = scalar_icon
elif "x" in shape_str:
icon = array_icon
else:
icon = vector_icon
item = QTreeWidgetItem(parent_item, [key, dtype, shape_str])
item.setIcon(0, icon)
item.setData(0, Qt.ItemDataRole.UserRole, child.name)
item.setData(0, Qt.ItemDataRole.UserRole + 1, "dataset")
item.setForeground(1, QBrush(QColor(get_accent_colors().success.name())))
item.setForeground(2, QBrush(QColor("#656365")))
def _get_filepath_for_item(self, item: QTreeWidgetItem) -> str:
"""Walk up the tree to find the filepath stored on the root node."""
node = item
while node.parent():
node = node.parent()
return node.data(0, Qt.UserRole + 2)
return node.data(0, Qt.ItemDataRole.UserRole + 2)
def _on_item_clicked(self, item, _col):
path = item.data(0, Qt.UserRole)
kind = item.data(0, Qt.UserRole + 1)
path = item.data(0, Qt.ItemDataRole.UserRole)
kind = item.data(0, Qt.ItemDataRole.UserRole + 1)
filepath = self._get_filepath_for_item(item)
if not path or not filepath:
@@ -494,7 +506,7 @@ class HDF5Viewer(QMainWindow):
h5file.close()
self.h5files.clear()
self.tree.clear()
self.data_panel._show_empty() # TODO: If it works, make function public!
self.data_panel.show_empty()
def closeEvent(self, event):
for h5file in self.h5files.values():
@@ -9,7 +9,4 @@ designer_plugins = {
"DigitalTwin": ("debye_bec.bec_widgets.widgets.digital_twin.digital_twin", "DigitalTwin"),
}
widget_icons = {
"DataViewer": "lightbulb",
"DigitalTwin": "lightbulb",
}
widget_icons = {"DataViewer": "find_in_page", "DigitalTwin": "lightbulb"}
@@ -256,6 +256,7 @@ def fm_ideal_pitch(
Returns:
tuple[float, float | None]: Pitch of mirror in rad, qy in mm
"""
p_cm = bl.cm.center[1] # posCM
p = bl.fm.center[1] # posFM
q = smpl - bl.fm.center[1] # dist posFM to posEX
if fm_focus == "Defocused":
@@ -264,14 +265,25 @@ def fm_ideal_pitch(
assert fm_focx is not None, "fm_focx must be provided for Defocused mode"
assert fm_focy is not None, "fm_focy must be provided for Defocused mode"
a = 2 * np.tan(sldi_hacc) * bl.fm.center[1] # Beam width at focusing mirror
# logger.info(f"a: {a}")
# logger.info(f"sldi_hacc: {sldi_hacc}")
# logger.info(f"bl.fm.center[1]: {bl.fm.center[1]}")
# logger.info(f"p: {p}")
# logger.info(f"q: {q}")
b = (
2 * np.tan(sldi_vacc) * bl.cm.center[1]
) # Beam height at focusing mirror (collimated beam)
x = fm_focx
# logger.info(f"x: {x}")
x = 0.098821 * x**2 + 0.512344 * x # polynom to correct for spot size
# logger.info(f"x (corrected): {x}")
y = fm_focy
y = 3.183562 * y**2 + 1.258364 * y # polynom to correct for spot size
qx = q + x * p / a
qy = q + y * p / b
qy = q + y * p_cm / b
f = (p * qx) / (p + qx) # focal length
# logger.info(f"qx: {qx}")
# logger.info(f"f: {f}")
else: # Calculate for focused beam on sample in "manual" and "focused" mode
qy = None
f = (p * q) / (p + q) # focal length
@@ -280,6 +292,7 @@ def fm_ideal_pitch(
pitch = np.arcsin(bl.fm.r[0] / (2 * f)) # ideal pitch for FM
if "Pt" in fm_stripe:
pitch = np.arcsin(bl.fm.r[1] / (2 * f)) # ideal pitch for FM
# logger.info(f"pitch: {pitch}")
return pitch, qy
@@ -65,6 +65,7 @@ logger = bec_logger.logger
OFFSET_FILE_X01DA = Path(__file__).with_name("x01da_offsets.yaml")
OFFSET_FILE_X10DA = Path(__file__).with_name("x10da_offsets.yaml")
class DigitalTwin(BECWidget, QWidget):
"""
Main widget of Digital Twin
@@ -224,12 +225,12 @@ class DigitalTwin(BECWidget, QWidget):
if choice in ["yes", "y"]:
bl = input(f"Choose from: {[bl.value for bl in BeamlineId]}")
if bl in BeamlineId:
logger.info(f'Manually selected beamline {bl}')
logger.info(f"Manually selected beamline {bl}")
return BeamlineId(bl)
else:
raise ValueError(f'Wrong selection {bl}')
raise ValueError(f"Wrong selection {bl}")
else:
raise ValueError('Cannot open digital twin without a beamline')
raise ValueError("Cannot open digital twin without a beamline")
@SafeSlot()
def check_bec_config(self, *args):
@@ -362,6 +363,10 @@ class DigitalTwin(BECWidget, QWidget):
self.calc_fm_reflectivity()
self.calc_cm_fm_harm_suppr()
self.calc_mo1_energy_resolution()
case "hacc":
self.calc_fm_ideal_pitch()
case "vacc":
self.calc_fm_ideal_pitch()
case "cm_stripe":
self.calc_cm_crit_pitch()
self.calc_cm_reflectivity()
@@ -672,7 +677,7 @@ class DigitalTwin(BECWidget, QWidget):
# Load offsets
if not self.offset_file.exists():
raise FileNotFoundError(f"Offset file not found: {self.offset_file}")
with self.offset_file.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
@@ -5,43 +5,43 @@ mo1_trx:
modifier:
axis: mo1_trx
range: [[-30, -0.1], [0.1, 30]]
offset: [0, 2.21]
offset: [-2.3, 1.31]
mo1_try:
modifier:
axis: mo1_trx
range: [[-30, -0.1], [0.1, 30]]
offset: [0, -1.6]
offset: [-1.78, -1.78]
sl1_centery:
offset: -1.8
offset: -1.2
fm_trx:
modifier:
axis: fm_trx
range: [[-66, -31], [-24, 7], [11, 31], [38, 66]]
offset: [0, 0, 0, -0.16]
offset: [-0.61, 0, 0, -0.16]
fm_try:
modifier:
axis: fm_trx
range: [[-66, -31], [-24, 7], [11, 31], [38, 66]]
offset: [0, 0, 0, -0.45]
offset: [0.028, 0, 0, -0.45]
fm_rotx:
modifier:
axis: fm_trx
range: [[-66, -31], [-24, 7], [11, 31], [38, 66]]
offset: [0, 0, 0, 0.063]
offset: [0.027, 0, 0, 0.045]
fm_roty:
modifier:
axis: fm_trx
range: [[-66, -31], [-24, 7], [11, 31], [38, 66]]
offset: [0, 0, 0, -0.04]
offset: [-0.038, 0, 0, -0.043]
sl2_centery:
offset: 1.2
offset: -0.7
ot_try:
offset: 0
@@ -70,10 +70,9 @@ xas_config:
#xrd_config:
# - !include ./x01da_xrd.yaml
# Commented out because too slow
## Hutch cameras
# hutch_cams:
# - !include ./x01da_hutch_cameras.yaml
hutch_cams:
- !include ./x01da_hutch_cameras.yaml
## Remaining experimental hutch
es_config:
+45 -25
View File
@@ -6,9 +6,13 @@ import threading
from typing import TYPE_CHECKING
import cv2
import numpy as np
import requests
from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import Kind, Signal
from ophyd_devices import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
@@ -16,7 +20,6 @@ from debye_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_lib.messages import ScanStatusMessage
logger = bec_logger.logger
@@ -28,59 +31,76 @@ CAM_PORT = 554
class HutchCam(PSIDeviceBase):
"""Class for the Hutch Cameras"""
# image = Cpt(Signal, name='image', kind='config')
USER_ACCESS = ["acquire_from_image"]
image = Cpt(Signal, name="image", kind=Kind.config)
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, scan_info=scan_info, **kwargs)
self.scan_parameters: ScanServerScanInfo = None
self.hostname = prefix
self.status = None
self.name = ""
# pylint: disable=E1101
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
Retrieve camera name which also makes sure the camera is connected.
"""
rtsp_url = f"rtsp://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch:{CAM_PORT}/rtpstream/config1"
cap = cv2.VideoCapture(f"{rtsp_url}?tcp")
if not cap.isOpened():
logger.error(self, "Connection Failed", "Could not connect to the camera stream.")
return
cap.release()
info_url = f"http://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch/-wvhttp-01-/info.cgi?item=c.1.name"
response = requests.get(info_url, timeout=5)
response.raise_for_status()
self.name = response.content.decode("utf-8").split("c.1.name.utf8:=")[-1].strip().lower()
def on_stage(self) -> DeviceStatus:
"""Called while staging the device."""
self.scan_parameters = fetch_scan_info(self.scan_info)
file_path = get_full_path(self.scan_info, name="hutch_cam_" + self.hostname).removesuffix(
"h5"
)
self.status = DeviceStatus(self)
file_path = get_full_path(self.scan_info.msg, name=self.name).removesuffix("h5")
thread = threading.Thread(
target=self._save_picture, args=(file_path, self.status), daemon=True
target=self.acquire_and_save_from_video, args=(file_path,), daemon=True
)
thread.start()
return self.status
return None
def _save_picture(self, file_path, status):
def acquire_from_image(self):
"""
Acquire an image from the image endpoint of the camera. Resolution is
1280 x 720 px and cannot be changed. Acquisition is fast.
"""
logger.debug(f"Capture from camera {self.hostname}")
image_url = (
f"http://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch/-wvhttp-01-/image.cgi"
)
response = requests.get(image_url, timeout=5)
response.raise_for_status()
img = cv2.imdecode(
np.frombuffer(response.content, np.uint8), cv2.IMREAD_COLOR
) # Or IMREAD_GRAYSCALE
self.image.put(img)
def acquire_and_save_from_video(self, file_path):
"""
Acquire an image from a videostream. Videostream resolution can be changed in the camera settings.
Acquisition is slow, as opening the video stream can take a few seconds.
Args:
file_path (str): File path including filename where image will be saved.
"""
try:
logger.info(f"Capture from camera {self.hostname}")
logger.debug(f"Capture from camera {self.hostname}")
rtsp_url = f"rtsp://{CAM_USERNAME}:{CAM_PASSWORD}@{self.hostname}.psi.ch:{CAM_PORT}/rtpstream/config1"
cap = cv2.VideoCapture(f"{rtsp_url}?tcp")
if not cap.isOpened():
logger.error("Connection Failed", "Could not connect to the camera stream.")
return
logger.info(f"Connection to camera {self.hostname} established")
ret, frame = cap.readAsync()
logger.debug(f"Connection to camera {self.hostname} established")
ret, frame = cap.read()
cap.release()
if not ret:
logger.error("Capture Failed", "Failed to capture image from camera.")
return
cv2.imwrite(file_path + "png", frame)
status.set_finished()
logger.info(f"Capture from camera {self.hostname} done")
logger.debug(f"Capture from camera {self.hostname} done")
except Exception as e:
status.set_exception(e)
logger.error(f"Could not acquire image from video stream: {e}")