Feat/slit gui and tomo parameter gui #241

Merged
holler merged 10 commits from feat/slit_gui into main 2026-07-02 09:02:25 +02:00
16 changed files with 1825 additions and 0 deletions
+81
View File
@@ -13,10 +13,91 @@ logger = bec_logger.logger
_Widgets = {
"SlitControlWidget": "SlitControlWidget",
"TomoParamsWidget": "TomoParamsWidget",
"XRayEye": "XRayEye",
}
class SlitControlWidget(RPCBase):
"""Interactive GUI for cSAXS slit center and size control."""
_IMPORT_MODULE = "csaxs_bec.bec_widgets.widgets.slit_control.slit_control"
@rpc_timeout(20)
@rpc_call
def set_slit(self, slit: "int"):
"""
Switch the widget to control a different slit (16).
"""
@rpc_call
def move_center(self, direction: "str"):
"""
Move the slit center by one step. direction: up / down / left / right.
"""
@rpc_call
def move_size(self, direction: "str"):
"""
Adjust the slit size by one step. up/right = grow, down/left = shrink.
"""
@rpc_call
def set_step(self, value: "float"):
"""
Set the shared step size in mm (clamped to 0.0052.0 mm).
"""
@rpc_call
def double_step(self):
"""
Double the current step size.
"""
@rpc_call
def halve_step(self):
"""
Halve the current step size.
"""
class TomoParamsWidget(RPCBase):
"""Interactive GUI for FlOMNI tomo scan parameter editing and queue management."""
_IMPORT_MODULE = "csaxs_bec.bec_widgets.widgets.tomo_params.tomo_params"
@rpc_call
def refresh(self) -> "None":
"""
Full refresh: params, queue, progress, sample name.
"""
@rpc_call
def enter_edit_mode(self) -> "None":
"""
Unlock all parameter fields for editing.
"""
@rpc_call
def cancel_edit(self) -> "None":
"""
Discard changes and re-lock fields.
"""
@rpc_call
def submit_params(self) -> "None":
"""
Validate, write to global vars, and re-lock fields.
"""
@rpc_call
def add_to_queue(self) -> "None":
"""
Snapshot the current live global-var parameters and append a new job.
"""
class XRayEye(RPCBase):
_IMPORT_MODULE = "csaxs_bec.bec_widgets.widgets.xray_eye.x_ray_eye"
@@ -5,9 +5,19 @@ from __future__ import annotations
# pylint: skip-file
designer_plugins = {
"SlitControlWidget": (
"csaxs_bec.bec_widgets.widgets.slit_control.slit_control",
"SlitControlWidget",
),
"TomoParamsWidget": (
"csaxs_bec.bec_widgets.widgets.tomo_params.tomo_params",
"TomoParamsWidget",
),
"XRayEye": ("csaxs_bec.bec_widgets.widgets.xray_eye.x_ray_eye", "XRayEye"),
}
widget_icons = {
"SlitControlWidget": "widgets",
"TomoParamsWidget": "widgets",
"XRayEye": "widgets",
}
@@ -0,0 +1,3 @@
from csaxs_bec.bec_widgets.widgets.slit_control.slit_control import SlitControlWidget
__all__ = ["SlitControlWidget"]
@@ -0,0 +1,15 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.slit_control.slit_control_plugin import SlitControlPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(SlitControlPlugin())
if __name__ == "__main__": # pragma: no cover
main()
@@ -0,0 +1,15 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.slit_control.slit_control_widget_plugin import SlitControlWidgetPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(SlitControlWidgetPlugin())
if __name__ == "__main__": # pragma: no cover
main()
@@ -0,0 +1,501 @@
from __future__ import annotations
from bec_lib import bec_logger
from bec_widgets import BECWidget
from bec_widgets.utils.rpc_decorator import rpc_timeout
from qtpy.QtCore import Qt, QTimer
from qtpy.QtGui import QKeySequence
from qtpy.QtWidgets import (
QButtonGroup,
QDoubleSpinBox,
QFrame,
QGridLayout,
QGroupBox,
QHBoxLayout,
QHeaderView,
QLabel,
QPushButton,
QRadioButton,
QShortcut,
QTableWidget,
QTableWidgetItem,
QVBoxLayout,
QWidget,
)
logger = bec_logger.logger
# ── arrow glyphs ─────────────────────────────────────────────────────────────
_UP = "\u2191" # ↑
_DOWN = "\u2193" # ↓
_LEFT = "\u2190" # ←
_RIGHT = "\u2192" # →
_SHRINK_X = "\u2192\u2190" # →← inward horizontal
_GROW_X = "\u2190\u2192" # ←→ outward horizontal
# Vertical size: ⬇/⬆ (bold filled arrows), no separator line
_SHRINK_Y = "\u2b07\n\u2b06" # ⬇⬆ inward vertical
_GROW_Y = "\u2b06\n\u2b07" # ⬆⬇ outward vertical
_Y_BTN_MIN_H = 55 # px enough for two bold lines
def _separator() -> QFrame:
sep = QFrame()
sep.setFrameShape(QFrame.Shape.HLine)
sep.setFrameShadow(QFrame.Shadow.Sunken)
return sep
# ── keyboard shortcut map ─────────────────────────────────────────────────────
# (Qt key string, action label for legend)
_KB_CENTER = [
("Up", "", "center up"),
("Down", "", "center down"),
("Left", "", "center left"),
("Right", "", "center right"),
]
_KB_SIZE = [
("S", "S", "x smaller"),
("F", "F", "x wider"),
("X", "X", "y smaller"),
("E", "E", "y wider"),
]
_KB_STEP = [
(",", ",", "step ÷ 2"),
(".", ".", "step × 2"),
]
class SlitControlWidget(BECWidget, QWidget):
"""
Interactive GUI for cSAXS slit center and size control.
Layout (top → bottom)
---------------------
1. Overview table all six slits with live readbacks and radio-button
selection (polled every ``_POLL_MS`` ms via ``device.read()``).
2. "Tweaking: Slit N" label.
3. S cluster (size, LEFT) | C cluster (center, RIGHT) ← mirrors keyboard.
4. Step control: [× 2] [step spinbox] [÷ 2].
5. ⌨ Keyboard-tweaking toggle + legend (hidden when inactive).
Keyboard mode
-------------
Toggle with the ⌨ button. When active:
* 16 select slit
* ← → ↑ ↓ move center
* S / F shrink / grow x size
* X / E shrink / grow y size
* , / . halve / double step
* Esc exit keyboard mode
"""
PLUGIN = True
USER_ACCESS = [
"set_slit",
"move_center",
"move_size",
"set_step",
"double_step",
"halve_step",
]
_SLIT_LABELS: dict[int, str] = {
1: "Slit 1 (frontend)",
2: "Slit 2 (optics 1)",
3: "Slit 3 (optics 2)",
4: "Slit 4 (EB1 entrance)",
5: "Slit 5 (EB1 exit)",
6: "Slit 6 (EB2)",
}
_STEP_MIN = 0.005 # mm (5 µm)
_STEP_MAX = 2.0 # mm
_STEP_DEFAULT = 0.05 # mm (50 µm)
_POLL_MS = 2000
def __init__(self, parent=None, slit: int = 4, **kwargs):
super().__init__(parent=parent, **kwargs)
self.get_bec_shortcuts()
self.slit = slit
self.step = self._STEP_DEFAULT
self._ov_items: dict[tuple[int, int], QTableWidgetItem] = {}
self._shortcuts: list[QShortcut] = []
# rotating index through non-selected slits for slow background polling
self._slow_poll_idx: int = 0
self._build_ui()
self._build_shortcuts() # created once, enabled/disabled by toggle
self.setFocusPolicy(Qt.FocusPolicy.StrongFocus)
# polling timer
self._poll_timer = QTimer(self)
self._poll_timer.setInterval(self._POLL_MS)
self._poll_timer.timeout.connect(self._refresh_overview)
self._poll_timer.start()
self._refresh_overview()
# ── UI construction ───────────────────────────────────────────────────────
def _build_ui(self):
root = QVBoxLayout(self)
root.setSpacing(8)
# 1. overview table
root.addWidget(self._build_overview())
root.addWidget(_separator())
# 2. active-slit label
self._tweaking_lbl = QLabel(f"Tweaking: {self._SLIT_LABELS[self.slit]}")
self._tweaking_lbl.setAlignment(Qt.AlignmentFlag.AlignCenter)
root.addWidget(self._tweaking_lbl)
# 3. S cluster (LEFT) | C cluster (RIGHT) ← mirrors keyboard layout
clusters = QHBoxLayout()
clusters.addWidget(self._build_size_cluster())
clusters.addWidget(self._build_center_cluster())
root.addLayout(clusters)
root.addWidget(_separator())
# 4. step [× 2] [spinbox] [÷ 2]
root.addWidget(self._build_step_control())
root.addWidget(_separator())
# 5. keyboard toggle + legend
self._btn_kb = QPushButton("\u2328 Keyboard tweaking")
self._btn_kb.setCheckable(True)
self._btn_kb.toggled.connect(self._on_kb_toggled)
root.addWidget(self._btn_kb)
self._kb_legend = self._build_kb_legend()
self._kb_legend.setVisible(False)
root.addWidget(self._kb_legend)
def _build_overview(self) -> QGroupBox:
box = QGroupBox("All slits (mm) select to tweak")
vbox = QVBoxLayout(box)
tbl = QTableWidget(len(self._SLIT_LABELS), 6)
tbl.setHorizontalHeaderLabels(["", "Slit", "x center", "x size", "y center", "y size"])
# col 0 (radio) and col 1 (name) sized to content;
# value cols 2-5 share remaining width equally → evenly distributed
tbl.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents)
tbl.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeMode.ResizeToContents)
for col in range(2, 6):
tbl.horizontalHeader().setSectionResizeMode(col, QHeaderView.ResizeMode.Stretch)
tbl.verticalHeader().setVisible(False)
tbl.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
tbl.setSelectionMode(QTableWidget.SelectionMode.NoSelection)
self._ov_table = tbl
self._slit_group = QButtonGroup(self)
for row, (slit_num, label) in enumerate(self._SLIT_LABELS.items()):
container = QWidget()
cl = QHBoxLayout(container)
cl.setContentsMargins(2, 0, 2, 0)
cl.setAlignment(Qt.AlignmentFlag.AlignCenter)
radio = QRadioButton()
radio.setChecked(slit_num == self.slit)
self._slit_group.addButton(radio, slit_num)
cl.addWidget(radio)
tbl.setCellWidget(row, 0, container)
name_item = QTableWidgetItem(label)
name_item.setFlags(Qt.ItemFlag.ItemIsEnabled)
tbl.setItem(row, 1, name_item)
for col_off in range(4):
item = QTableWidgetItem("---")
item.setTextAlignment(Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)
item.setFlags(Qt.ItemFlag.ItemIsEnabled)
tbl.setItem(row, col_off + 2, item)
self._ov_items[(row, col_off + 2)] = item
self._slit_group.idClicked.connect(self.set_slit)
self._update_row_bold()
vbox.addWidget(tbl)
return box
def _build_size_cluster(self) -> QGroupBox:
"""Size cluster on the LEFT (s/f/e/x keys are on the left of a keyboard)."""
box = QGroupBox("Size (S)")
grid = QGridLayout(box)
grid.setSpacing(4)
lbl = QLabel("S")
lbl.setAlignment(Qt.AlignmentFlag.AlignCenter)
btn_grow_y = QPushButton(_GROW_Y)
btn_shrink_y = QPushButton(_SHRINK_Y)
btn_shrink_x = QPushButton(_SHRINK_X)
btn_grow_x = QPushButton(_GROW_X)
btn_grow_y.setMinimumHeight(_Y_BTN_MIN_H)
btn_shrink_y.setMinimumHeight(_Y_BTN_MIN_H)
btn_grow_y.setToolTip("Increase y size [E]")
btn_shrink_y.setToolTip("Decrease y size [X]")
btn_shrink_x.setToolTip("Decrease x size [S]")
btn_grow_x.setToolTip("Increase x size [F]")
btn_grow_y.clicked.connect(lambda: self.move_size("up"))
btn_shrink_y.clicked.connect(lambda: self.move_size("down"))
btn_shrink_x.clicked.connect(lambda: self.move_size("left"))
btn_grow_x.clicked.connect(lambda: self.move_size("right"))
grid.addWidget(btn_grow_y, 0, 1)
grid.addWidget(btn_shrink_x, 1, 0)
grid.addWidget(lbl, 1, 1)
grid.addWidget(btn_grow_x, 1, 2)
grid.addWidget(btn_shrink_y, 2, 1)
return box
def _build_center_cluster(self) -> QGroupBox:
"""Center cluster on the RIGHT (arrow keys are on the right of a keyboard)."""
box = QGroupBox("Center (C)")
grid = QGridLayout(box)
grid.setSpacing(4)
lbl = QLabel("C")
lbl.setAlignment(Qt.AlignmentFlag.AlignCenter)
btn_up = QPushButton(_UP)
btn_down = QPushButton(_DOWN)
btn_left = QPushButton(_LEFT)
btn_right = QPushButton(_RIGHT)
btn_up.setToolTip("Center up [↑]")
btn_down.setToolTip("Center down [↓]")
btn_left.setToolTip("Center left [←]")
btn_right.setToolTip("Center right [→]")
btn_up.clicked.connect(lambda: self.move_center("up"))
btn_down.clicked.connect(lambda: self.move_center("down"))
btn_left.clicked.connect(lambda: self.move_center("left"))
btn_right.clicked.connect(lambda: self.move_center("right"))
grid.addWidget(btn_up, 0, 1)
grid.addWidget(btn_left, 1, 0)
grid.addWidget(lbl, 1, 1)
grid.addWidget(btn_right, 1, 2)
grid.addWidget(btn_down, 2, 1)
return box
def _build_step_control(self) -> QGroupBox:
box = QGroupBox("Step size (mm)")
layout = QHBoxLayout(box)
btn_double = QPushButton("\u00d7 2")
self._step_spin = QDoubleSpinBox()
self._step_spin.setDecimals(4)
self._step_spin.setRange(self._STEP_MIN, self._STEP_MAX)
self._step_spin.setSingleStep(0.005)
self._step_spin.setValue(self.step)
self._step_spin.valueChanged.connect(self._on_step_changed)
btn_half = QPushButton("\u00f7 2")
btn_double.setToolTip("Double step [.]")
btn_half.setToolTip("Halve step [,]")
btn_double.clicked.connect(self.double_step)
btn_half.clicked.connect(self.halve_step)
layout.addWidget(btn_double)
layout.addWidget(self._step_spin)
layout.addWidget(btn_half)
return box
def _build_kb_legend(self) -> QGroupBox:
box = QGroupBox("\u2328 Keyboard tweaking active — press Esc to exit")
grid = QGridLayout(box)
grid.setSpacing(2)
grid.setColumnMinimumWidth(1, 20)
rows = [
("Slit:", "1 2 3 4 5 6"),
("Center:", "\u2190 \u2192 \u2191 \u2193 (arrow keys)"),
("Size x:", "S smaller F wider"),
("Size y:", "X smaller E wider"),
("Step:", ", halve . double"),
]
for r, (label, desc) in enumerate(rows):
lbl = QLabel(f"<b>{label}</b>")
desc_lbl = QLabel(desc)
grid.addWidget(lbl, r, 0, Qt.AlignmentFlag.AlignRight)
grid.addWidget(desc_lbl, r, 2)
return box
# ── keyboard shortcuts ────────────────────────────────────────────────────
def _build_shortcuts(self):
"""Create all shortcuts once (disabled); toggle enables/disables them."""
ctx = Qt.ShortcutContext.WidgetWithChildrenShortcut
def _sc(key: str, fn) -> QShortcut:
sc = QShortcut(QKeySequence(key), self)
sc.setContext(ctx)
sc.activated.connect(fn)
sc.setEnabled(False)
return sc
# slit selection 1-6
for num in range(1, 7):
self._shortcuts.append(_sc(str(num), lambda n=num: self.set_slit(n)))
# center movement (arrow keys)
self._shortcuts.append(_sc("Up", lambda: self.move_center("up")))
self._shortcuts.append(_sc("Down", lambda: self.move_center("down")))
self._shortcuts.append(_sc("Left", lambda: self.move_center("left")))
self._shortcuts.append(_sc("Right", lambda: self.move_center("right")))
# size keys
self._shortcuts.append(_sc("S", lambda: self.move_size("left"))) # x smaller
self._shortcuts.append(_sc("F", lambda: self.move_size("right"))) # x wider
self._shortcuts.append(_sc("X", lambda: self.move_size("down"))) # y smaller
self._shortcuts.append(_sc("E", lambda: self.move_size("up"))) # y wider
# step
self._shortcuts.append(_sc(",", self.halve_step))
self._shortcuts.append(_sc(".", self.double_step))
# escape to exit keyboard mode
self._shortcuts.append(_sc("Escape", lambda: self._btn_kb.setChecked(False)))
def _on_kb_toggled(self, checked: bool):
for sc in self._shortcuts:
sc.setEnabled(checked)
self._kb_legend.setVisible(checked)
if checked:
self._btn_kb.setText("\u2328 Keyboard tweaking ON")
self.setFocus() # grab focus so shortcuts fire immediately
else:
self._btn_kb.setText("\u2328 Keyboard tweaking")
# ── overview polling ──────────────────────────────────────────────────────
def _refresh_slit_row(self, slit_num: int) -> None:
"""Read all four axis values for one slit and update its table row."""
row = list(self._SLIT_LABELS).index(slit_num)
for col_off, suffix in enumerate(("xc", "xs", "yc", "ys")):
dev_name = f"sl{slit_num}{suffix}"
item = self._ov_items.get((row, col_off + 2))
if item is None:
continue
try:
device = getattr(self.dev, dev_name)
reading = device.read()
value = float(reading[dev_name]["value"])
item.setText(f"{value:.4f}")
except Exception:
item.setText("---")
def _refresh_overview(self) -> None:
"""
Per-tick (2 s) refresh strategy:
- Selected slit: every tick → updated every 2 s.
- Other slits: one per tick in rotation → each updated every ~10 s
(5 other slits × 2 s). At most 8 device.read() calls per tick.
"""
# always refresh the selected slit
self._refresh_slit_row(self.slit)
# advance through the other slits one per tick
others = [n for n in self._SLIT_LABELS if n != self.slit]
if others:
self._slow_poll_idx = self._slow_poll_idx % len(others)
self._refresh_slit_row(others[self._slow_poll_idx])
self._slow_poll_idx += 1
def _update_row_bold(self):
for row, (slit_num, _) in enumerate(self._SLIT_LABELS.items()):
item = self._ov_table.item(row, 1)
if item is None:
continue
font = item.font()
font.setBold(slit_num == self.slit)
item.setFont(font)
# ── device naming ─────────────────────────────────────────────────────────
def _dev_name(self, suffix: str) -> str:
return f"sl{self.slit}{suffix}"
# ── public API ────────────────────────────────────────────────────────────
@rpc_timeout(20)
def set_slit(self, slit: int):
"""Switch the widget to control a different slit (16)."""
if slit not in self._SLIT_LABELS:
raise ValueError(f"Unknown slit number: {slit}")
self.slit = slit
btn = self._slit_group.button(slit)
if btn is not None:
btn.setChecked(True)
self._tweaking_lbl.setText(f"Tweaking: {self._SLIT_LABELS[slit]}")
self._update_row_bold()
def move_center(self, direction: str):
"""Move the slit center. direction: up / down / left / right."""
axis_sign = {
"up": ("yc", 1),
"down": ("yc", -1),
"right": ("xc", 1),
"left": ("xc", -1),
}
if direction not in axis_sign:
raise ValueError(f"Unknown direction: {direction!r}")
suffix, sign = axis_sign[direction]
self._move_relative(suffix, sign * self.step)
def move_size(self, direction: str):
"""Adjust slit size. up/right = grow, down/left = shrink."""
axis_sign = {
"up": ("ys", 1),
"down": ("ys", -1),
"right": ("xs", 1),
"left": ("xs", -1),
}
if direction not in axis_sign:
raise ValueError(f"Unknown direction: {direction!r}")
suffix, sign = axis_sign[direction]
self._move_relative(suffix, sign * self.step)
def _move_relative(self, axis_suffix: str, delta: float):
dev_name = self._dev_name(axis_suffix)
try:
device = getattr(self.dev, dev_name)
reading = device.read()
current = float(reading[dev_name]["value"])
device.move(current + delta)
except Exception as exc:
logger.warning(f"SlitControlWidget: failed to move {dev_name}: {exc}")
def set_step(self, value: float):
"""Set step size in mm (clamped to 0.0052.0 mm)."""
value = max(self._STEP_MIN, min(self._STEP_MAX, float(value)))
self.step = value
self._step_spin.blockSignals(True)
self._step_spin.setValue(self.step)
self._step_spin.blockSignals(False)
def double_step(self):
self.set_step(self.step * 2)
def halve_step(self):
self.set_step(self.step / 2)
# ── internal slots ────────────────────────────────────────────────────────
def _on_step_changed(self, value: float):
self.step = max(self._STEP_MIN, min(self._STEP_MAX, value))
# ── cleanup ───────────────────────────────────────────────────────────────
def cleanup(self):
self._poll_timer.stop()
for sc in self._shortcuts:
sc.setEnabled(False)
super().cleanup()
@@ -0,0 +1,56 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from bec_widgets.utils.bec_designer import designer_material_icon
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from csaxs_bec.bec_widgets.widgets.slit_control.slit_control import SlitControlWidget
DOM_XML = """
<ui language='c++'>
<widget class='SlitControlWidget' name='slit_control'>
</widget>
</ui>
"""
class SlitControlPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
return SlitControlWidget(parent)
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(SlitControlWidget.ICON_NAME)
def includeFile(self):
return "slit_control"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "SlitControlWidget"
def toolTip(self):
return "SlitControlWidget"
def whatsThis(self):
return self.toolTip()
@@ -0,0 +1 @@
{'files': ['slit_control.py']}
@@ -0,0 +1,57 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_designer import designer_material_icon
from csaxs_bec.bec_widgets.widgets.slit_control.slit_control import SlitControlWidget
DOM_XML = """
<ui language='c++'>
<widget class='SlitControlWidget' name='slit_control_widget'>
</widget>
</ui>
"""
class SlitControlWidgetPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
t = SlitControlWidget(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(SlitControlWidget.ICON_NAME)
def includeFile(self):
return "slit_control_widget"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "SlitControlWidget"
def toolTip(self):
return ""
def whatsThis(self):
return self.toolTip()
@@ -0,0 +1,3 @@
from csaxs_bec.bec_widgets.widgets.tomo_params.tomo_params import TomoParamsWidget
__all__ = ["TomoParamsWidget"]
@@ -0,0 +1,15 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.tomo_params.tomo_params_plugin import TomoParamsPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(TomoParamsPlugin())
if __name__ == "__main__": # pragma: no cover
main()
@@ -0,0 +1,15 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.tomo_params.tomo_params_widget_plugin import TomoParamsWidgetPlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(TomoParamsWidgetPlugin())
if __name__ == "__main__": # pragma: no cover
main()
@@ -0,0 +1,942 @@
"""
TomoParamsWidget BEC widget for editing FlOMNI tomo scan parameters and
managing the tomo scan queue.
Parameters are stored as BEC global vars; this widget reads and writes them
via ``self.client.get_global_var`` / ``self.client.set_global_var``, which
is the same mechanism used by the ``Flomni`` CLI object.
Polling
-------
BEC does not publish push notifications for global-var changes, so a
``QTimer`` polls ``tomo_queue`` and ``tomo_progress`` every 2 s to stay in
sync with changes made in a parallel CLI session. The parameter panel is
refreshed on demand (when the user opens it or submits changes) rather than
on every poll, to avoid clobbering in-progress edits.
Queue execution
---------------
``tomo_queue_execute()`` is a blocking Flomni CLI method that cannot be
called directly from the GUI server process. This widget manages the queue
via direct global-var manipulation (add / delete / clear) and shows an
explicit hint for executing it from the CLI.
"""
from __future__ import annotations
import datetime
from typing import Any, Optional
from bec_lib import bec_logger
from bec_widgets import BECWidget, SafeSlot
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import (
QCheckBox,
QComboBox,
QDialog,
QDoubleSpinBox,
QFormLayout,
QFrame,
QGroupBox,
QHBoxLayout,
QHeaderView,
QInputDialog,
QLabel,
QLineEdit,
QMessageBox,
QPushButton,
QScrollArea,
QSpinBox,
QTableWidget,
QTableWidgetItem,
QVBoxLayout,
QWidget,
)
logger = bec_logger.logger
# ── constants ────────────────────────────────────────────────────────────────
TOMO_TYPES = {
1: "8 equally spaced sub-tomograms",
2: "Golden ratio (sorted in bunches)",
3: "Equally spaced, golden ratio start",
}
STATUS_COLORS = {
"pending": "#888888",
"running": "#2196F3",
"incomplete": "#FF9800",
"done": "#4CAF50",
}
# Exact tuple from Flomni._TOMO_QUEUE_PARAM_NAMES (source of truth in flomni.py)
QUEUE_PARAM_NAMES = (
"tomo_countingtime",
"tomo_shellstep",
"fovx",
"fovy",
"stitch_x",
"stitch_y",
"tomo_stitch_overlap",
"ptycho_reconstruct_foldername",
"manual_shift_y",
"frames_per_trigger",
"single_point_instead_of_fermat_scan",
"tomo_type",
"tomo_angle_range",
"tomo_angle_stepsize",
"golden_ratio_bunch_size",
"golden_max_number_of_projections",
"golden_projections_at_0_deg_for_damage_estimation",
"zero_deg_reference_at_each_subtomo",
"corridor_size",
)
DEFAULTS: dict[str, Any] = {
"tomo_countingtime": 0.1,
"tomo_shellstep": 1.0,
"fovx": 20.0,
"fovy": 20.0,
"stitch_x": 0,
"stitch_y": 0,
"tomo_stitch_overlap": 0.2,
"ptycho_reconstruct_foldername": "ptycho_reconstruct",
"manual_shift_y": 0.0,
"frames_per_trigger": 1,
"single_point_instead_of_fermat_scan": False,
"tomo_type": 1,
"corridor_size": -1.0,
"tomo_angle_range": 180,
"tomo_angle_stepsize": 10.0,
"zero_deg_reference_at_each_subtomo": False,
"golden_ratio_bunch_size": 20,
"golden_max_number_of_projections": 1000.0,
"golden_projections_at_0_deg_for_damage_estimation": 0,
}
def _hline() -> QFrame:
line = QFrame()
line.setFrameShape(QFrame.Shape.HLine)
line.setFrameShadow(QFrame.Shadow.Sunken)
return line
# ── main widget ──────────────────────────────────────────────────────────────
class TomoParamsWidget(BECWidget, QWidget):
"""
Interactive GUI for FlOMNI tomo scan parameter editing and queue management.
Layout
------
Top half (scrollable):
- Read-only sample-name header
- Tomo-type dropdown (always visible)
- Common parameters (always visible)
- Type-1 section: angle range, projected total, 0° reference
- Type-2/3 section: golden ratio parameters
- Edit / Submit / Cancel buttons
Bottom half (splitter):
- Queue table: all jobs with status, type, FOV, added_at
- Queue action buttons: Add, Delete selected, Clear, Execute hint
- Progress panel: live tomo_progress fields, polled every 2 s
"""
PLUGIN = True
USER_ACCESS = [
"refresh",
"enter_edit_mode",
"cancel_edit",
"submit_params",
"add_to_queue",
]
_POLL_INTERVAL_MS = 2000
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self.get_bec_shortcuts()
self._edit_mode = False
# widget refs populated by _build_params_panel
self._pw: dict[str, QWidget] = {} # key -> input widget
self._build_ui()
# polling timer does NOT refresh params (avoid overwriting edits)
self._poll_timer = QTimer(self)
self._poll_timer.setInterval(self._POLL_INTERVAL_MS)
self._poll_timer.timeout.connect(self._on_poll)
self._poll_timer.start()
# initial load
self.refresh()
# ── global-var I/O ───────────────────────────────────────────────────────
def _gv_get(self, key: str) -> Any:
try:
return self.client.get_global_var(key)
except Exception as exc:
logger.warning(f"TomoParamsWidget: get_global_var({key!r}) failed: {exc}")
return None
def _gv_set(self, key: str, value: Any) -> bool:
try:
self.client.set_global_var(key, value)
return True
except Exception as exc:
logger.warning(f"TomoParamsWidget: set_global_var({key!r}) failed: {exc}")
return False
def _load_params(self) -> dict[str, Any]:
params = {}
for key in QUEUE_PARAM_NAMES:
val = self._gv_get(key)
params[key] = val if val is not None else DEFAULTS.get(key)
return params
def _load_queue(self) -> list[dict]:
val = self._gv_get("tomo_queue")
if not isinstance(val, list):
return []
return val
def _save_queue(self, jobs: list[dict]) -> None:
self._gv_set("tomo_queue", jobs)
# ── UI construction ───────────────────────────────────────────────────────
def _build_ui(self):
root = QVBoxLayout(self)
root.setSpacing(6)
# scrollable params panel
params_scroll = QScrollArea()
params_scroll.setWidgetResizable(True)
params_inner = QWidget()
params_vbox = QVBoxLayout(params_inner)
params_vbox.setSpacing(6)
params_vbox.addWidget(self._build_header())
params_vbox.addWidget(_hline())
params_vbox.addWidget(self._build_params_panel())
params_vbox.addLayout(self._build_edit_buttons())
params_vbox.addStretch()
params_scroll.setWidget(params_inner)
root.addWidget(params_scroll)
self._btn_queue = QPushButton("☰ Queue control…")
self._btn_queue.setToolTip("Open the tomo queue manager")
self._btn_queue.clicked.connect(self._show_queue_dialog)
root.addWidget(self._btn_queue)
self._queue_dlg: Optional["TomoQueueDialog"] = None
def _build_header(self) -> QGroupBox:
box = QGroupBox("Sample")
layout = QHBoxLayout(box)
self._lbl_sample = QLabel("---")
layout.addWidget(QLabel("Name:"))
layout.addWidget(self._lbl_sample)
layout.addStretch()
return box
def _build_params_panel(self) -> QGroupBox:
box = QGroupBox("Tomo parameters")
vbox = QVBoxLayout(box)
# tomo_type selector (always visible, drives section visibility)
type_row = QHBoxLayout()
type_row.addWidget(QLabel("Tomo type:"))
self._pw["tomo_type"] = QComboBox()
for tid, label in TOMO_TYPES.items():
self._pw["tomo_type"].addItem(f"{tid} {label}", tid)
self._pw["tomo_type"].currentIndexChanged.connect(self._on_type_changed)
type_row.addWidget(self._pw["tomo_type"], stretch=1)
vbox.addLayout(type_row)
vbox.addWidget(_hline())
# common parameters
common_form = QFormLayout()
common_form.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
self._add_double(
common_form,
"tomo_countingtime",
"Counting time (s)",
min_=0.001,
max_=100.0,
decimals=3,
)
self._add_double(
common_form, "tomo_shellstep", "Shell step (µm)", min_=0.001, max_=1000.0, decimals=3
)
self._add_double(common_form, "fovx", "FOV x (µm)", min_=0.1, max_=200.0, decimals=2)
self._add_double(common_form, "fovy", "FOV y (µm)", min_=0.1, max_=100.0, decimals=2)
self._add_int(common_form, "stitch_x", "Stitch x", min_=0, max_=50)
self._add_int(common_form, "stitch_y", "Stitch y", min_=0, max_=50)
self._add_double(
common_form,
"tomo_stitch_overlap",
"Stitch overlap (µm)",
min_=0.0,
max_=50.0,
decimals=3,
)
self._add_text(common_form, "ptycho_reconstruct_foldername", "Reconstruct folder")
self._add_double(
common_form,
"manual_shift_y",
"Manual shift y (µm)",
min_=-1000.0,
max_=1000.0,
decimals=3,
)
self._add_int(common_form, "frames_per_trigger", "Frames / trigger", min_=1, max_=100)
self._add_double(
common_form,
"corridor_size",
"Corridor size (µm, 1=off)",
min_=-1.0,
max_=200.0,
decimals=2,
)
sp = self._add_bool(common_form, "single_point_instead_of_fermat_scan", "Single-point scan")
sp.stateChanged.connect(self._on_single_point_changed)
vbox.addLayout(common_form)
# type-1 section
self._sec_type1 = self._build_type1_section()
vbox.addWidget(self._sec_type1)
# type-2/3 section
self._sec_type23 = self._build_type23_section()
vbox.addWidget(self._sec_type23)
return box
def _build_type1_section(self) -> QGroupBox:
box = QGroupBox("Type 1 sub-tomogram settings")
form = QFormLayout(box)
form.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
# angle range combo
self._pw["tomo_angle_range"] = QComboBox()
for v in (180, 360):
self._pw["tomo_angle_range"].addItem(f"{v}°", v)
self._pw["tomo_angle_range"].currentIndexChanged.connect(self._update_projection_preview)
form.addRow("Angle range:", self._pw["tomo_angle_range"])
# requested total projections (user entry in edit mode)
self._pw["_requested_total"] = QSpinBox()
self._pw["_requested_total"].setRange(8, 10000)
self._pw["_requested_total"].setSingleStep(8)
self._pw["_requested_total"].setValue(144)
self._pw["_requested_total"].valueChanged.connect(self._update_projection_preview)
form.addRow("Requested total projections:", self._pw["_requested_total"])
# read-only preview labels
self._lbl_actual_total = QLabel("---")
self._lbl_achievable_step = QLabel("---")
form.addRow("Actual achievable total:", self._lbl_actual_total)
form.addRow("Angular step (°):", self._lbl_achievable_step)
self._add_bool(form, "zero_deg_reference_at_each_subtomo", "0° reference each sub-tomo")
return box
def _build_type23_section(self) -> QGroupBox:
box = QGroupBox("Type 2 / 3 golden ratio settings")
form = QFormLayout(box)
form.setLabelAlignment(Qt.AlignmentFlag.AlignRight)
self._add_int(
form, "golden_ratio_bunch_size", "Bunch size (type 2 only)", min_=1, max_=10000
)
self._add_double(
form,
"golden_max_number_of_projections",
"Max projections (0 = ∞)",
min_=0.0,
max_=1e6,
decimals=0,
)
self._add_bool(
form,
"golden_projections_at_0_deg_for_damage_estimation",
"0° projections for damage estimation",
)
return box
def _build_edit_buttons(self) -> QHBoxLayout:
layout = QHBoxLayout()
self._btn_edit = QPushButton("Edit")
self._btn_submit = QPushButton("Submit")
self._btn_cancel = QPushButton("Cancel")
self._btn_submit.setVisible(False)
self._btn_cancel.setVisible(False)
self._btn_edit.clicked.connect(self.enter_edit_mode)
self._btn_submit.clicked.connect(self.submit_params)
self._btn_cancel.clicked.connect(self.cancel_edit)
layout.addStretch()
layout.addWidget(self._btn_edit)
layout.addWidget(self._btn_submit)
layout.addWidget(self._btn_cancel)
return layout
# ── widget factory helpers ────────────────────────────────────────────────
def _add_double(
self, form: QFormLayout, key: str, label: str, min_: float, max_: float, decimals: int
) -> QDoubleSpinBox:
w = QDoubleSpinBox()
w.setDecimals(decimals)
w.setRange(min_, max_)
w.setEnabled(False)
self._pw[key] = w
form.addRow(f"{label}:", w)
return w
def _add_int(self, form: QFormLayout, key: str, label: str, min_: int, max_: int) -> QSpinBox:
w = QSpinBox()
w.setRange(min_, max_)
w.setEnabled(False)
self._pw[key] = w
form.addRow(f"{label}:", w)
return w
def _add_text(self, form: QFormLayout, key: str, label: str) -> QLineEdit:
w = QLineEdit()
w.setEnabled(False)
self._pw[key] = w
form.addRow(f"{label}:", w)
return w
def _add_bool(self, form: QFormLayout, key: str, label: str) -> QCheckBox:
w = QCheckBox()
w.setEnabled(False)
self._pw[key] = w
form.addRow(f"{label}:", w)
return w
# ── populate / read field values ─────────────────────────────────────────
def _populate_fields(self, params: dict[str, Any]) -> None:
"""Fill all input widgets from a params dict (does not enable them)."""
type_val = int(params.get("tomo_type", 1))
idx = self._pw["tomo_type"].findData(type_val)
if idx >= 0:
self._pw["tomo_type"].blockSignals(True)
self._pw["tomo_type"].setCurrentIndex(idx)
self._pw["tomo_type"].blockSignals(False)
for key, widget in self._pw.items():
if key in ("tomo_type", "_requested_total"):
continue
val = params.get(key)
if val is None:
continue
if isinstance(widget, QDoubleSpinBox):
widget.setValue(float(val))
elif isinstance(widget, QSpinBox):
widget.setValue(int(val))
elif isinstance(widget, QLineEdit):
widget.setText(str(val))
elif isinstance(widget, QCheckBox):
widget.setChecked(bool(val))
elif isinstance(widget, QComboBox):
idx2 = widget.findData(int(val))
if idx2 >= 0:
widget.setCurrentIndex(idx2)
# derive requested_total from stored tomo_angle_stepsize
angle_range = int(params.get("tomo_angle_range", 180))
stepsize = float(params.get("tomo_angle_stepsize", 10.0))
actual_total, _, _ = _compute_type1(angle_range, stepsize)
self._pw["_requested_total"].blockSignals(True)
self._pw["_requested_total"].setValue(actual_total)
self._pw["_requested_total"].blockSignals(False)
self._update_projection_preview()
self._update_type_visibility(type_val)
def _read_fields(self) -> dict[str, Any]:
"""Read all input widgets into a params dict."""
params: dict[str, Any] = {}
params["tomo_type"] = self._pw["tomo_type"].currentData()
for key, widget in self._pw.items():
if key in ("tomo_type", "_requested_total"):
continue
if isinstance(widget, QDoubleSpinBox):
params[key] = widget.value()
elif isinstance(widget, QSpinBox):
params[key] = widget.value()
elif isinstance(widget, QLineEdit):
params[key] = widget.text().strip()
elif isinstance(widget, QCheckBox):
params[key] = widget.isChecked()
elif isinstance(widget, QComboBox):
params[key] = widget.currentData()
# derive tomo_angle_stepsize from requested_total + angle_range
angle_range = int(params.get("tomo_angle_range", 180))
requested = self._pw["_requested_total"].value()
stepsize = _requested_to_stepsize(angle_range, requested)
params["tomo_angle_stepsize"] = stepsize
# single_point forces stitch to 0
if params.get("single_point_instead_of_fermat_scan"):
params["stitch_x"] = 0
params["stitch_y"] = 0
return params
# ── validation ────────────────────────────────────────────────────────────
def _validate(self, params: dict[str, Any]) -> Optional[str]:
if params.get("fovx", 0) > 200:
return "fovx must be ≤ 200 µm"
if params.get("fovy", 0) > 100:
return "fovy must be ≤ 100 µm"
if not isinstance(params.get("stitch_x"), int):
return "stitch_x must be an integer"
if not isinstance(params.get("stitch_y"), int):
return "stitch_y must be an integer"
fpt = params.get("frames_per_trigger", 1)
if not isinstance(fpt, int) or fpt < 1 or isinstance(fpt, bool):
return "frames_per_trigger must be a positive integer"
if params.get("tomo_type") not in (1, 2, 3):
return "tomo_type must be 1, 2, or 3"
if params.get("tomo_angle_range") not in (180, 360):
return "tomo_angle_range must be 180 or 360"
return None
# ── type visibility ───────────────────────────────────────────────────────
def _update_type_visibility(self, tomo_type: int) -> None:
self._sec_type1.setVisible(tomo_type == 1)
self._sec_type23.setVisible(tomo_type in (2, 3))
# bunch_size is type-2 only; find its row in the form layout
bunch_widget = self._pw.get("golden_ratio_bunch_size")
if bunch_widget is not None:
bunch_widget.setVisible(tomo_type == 2)
# also hide the label in the form layout
form = self._sec_type23.layout()
if isinstance(form, QFormLayout):
idx = form.indexOf(bunch_widget)
if idx >= 0:
row, role = form.getItemPosition(idx)
label_item = form.itemAt(row, QFormLayout.ItemRole.LabelRole)
if label_item and label_item.widget():
label_item.widget().setVisible(tomo_type == 2)
# ── type-1 projection preview ─────────────────────────────────────────────
def _update_projection_preview(self) -> None:
angle_range_widget = self._pw.get("tomo_angle_range")
if angle_range_widget is None:
return
angle_range = int(angle_range_widget.currentData() or 180)
requested = self._pw["_requested_total"].value()
stepsize = _requested_to_stepsize(angle_range, requested)
actual_total, achievable_step, _ = _compute_type1(angle_range, stepsize)
self._lbl_actual_total.setText(str(actual_total))
self._lbl_achievable_step.setText(f"{achievable_step:.4f}")
if actual_total != requested:
self._lbl_actual_total.setStyleSheet("color: orange;")
self._lbl_actual_total.setToolTip(
f"Requested {requested} → actual {actual_total} "
f"(must be divisible into 8 equal sub-tomograms)"
)
else:
self._lbl_actual_total.setStyleSheet("")
self._lbl_actual_total.setToolTip("")
# ── slots ─────────────────────────────────────────────────────────────────
def _on_type_changed(self, _index: int) -> None:
tomo_type = self._pw["tomo_type"].currentData()
self._update_type_visibility(tomo_type)
def _on_single_point_changed(self, state: int) -> None:
checked = bool(state)
if checked:
self._pw["stitch_x"].setValue(0)
self._pw["stitch_y"].setValue(0)
self._pw["stitch_x"].setEnabled(self._edit_mode and not checked)
self._pw["stitch_y"].setEnabled(self._edit_mode and not checked)
def _on_poll(self) -> None:
"""Periodic refresh: params (guarded against edit mode) + sample name."""
self._refresh_params()
self._refresh_sample_name()
# ── public refresh API ────────────────────────────────────────────────────
def refresh(self) -> None:
"""Full refresh: params + sample name."""
self._refresh_params()
self._refresh_sample_name()
def _refresh_sample_name(self) -> None:
# Mirror the exact path used by Flomni.sample_get_name(0):
# getattr(dev.flomni_samples.sample_names, "sample0").get()
try:
name_signal = self.dev.flomni_samples.sample_names.sample0
name = name_signal.get()
self._lbl_sample.setText(str(name) if name else "---")
except Exception:
self._lbl_sample.setText("---")
def _refresh_params(self) -> None:
if self._edit_mode:
return
params = self._load_params()
self._populate_fields(params)
def _refresh_queue(self) -> None:
jobs = self._load_queue()
table = self._queue_table
table.setRowCount(len(jobs))
for row, job in enumerate(jobs):
params = job.get("params", {})
status = job.get("status", "pending")
color = STATUS_COLORS.get(status, "#888888")
cells = [
str(row),
job.get("label", f"job_{row}"),
status,
str(params.get("tomo_type", "?")),
f"{params.get('fovx', '?')} × {params.get('fovy', '?')}",
job.get("added_at", ""),
]
for col, text in enumerate(cells):
item = QTableWidgetItem(text)
item.setForeground(table.palette().text() if col != 2 else _color_from_hex(color))
table.setItem(row, col, item)
# ── edit-mode workflow ────────────────────────────────────────────────────
def enter_edit_mode(self) -> None:
"""Unlock all parameter fields for editing."""
# Refresh first so fields reflect latest backend state
params = self._load_params()
self._populate_fields(params)
self._set_fields_enabled(True)
self._edit_mode = True
self._btn_edit.setVisible(False)
self._btn_submit.setVisible(True)
self._btn_cancel.setVisible(True)
def cancel_edit(self) -> None:
"""Discard changes and re-lock fields."""
self._edit_mode = False
self._set_fields_enabled(False)
self._btn_edit.setVisible(True)
self._btn_submit.setVisible(False)
self._btn_cancel.setVisible(False)
# restore from backend
params = self._load_params()
self._populate_fields(params)
def submit_params(self) -> None:
"""Validate, write to global vars, and re-lock fields."""
params = self._read_fields()
error = self._validate(params)
if error:
QMessageBox.warning(self, "Validation error", error)
return
# Confirm if actual_total differs from requested (type-1)
if params.get("tomo_type") == 1:
angle_range = int(params.get("tomo_angle_range", 180))
requested = self._pw["_requested_total"].value()
stepsize = params["tomo_angle_stepsize"]
actual_total, _, _ = _compute_type1(angle_range, stepsize)
if actual_total != requested:
reply = QMessageBox.question(
self,
"Projection count adjusted",
f"Requested {requested} projections → achievable: {actual_total}.\n"
f"(Must be divisible into 8 equal sub-tomograms.)\n\n"
f"Submit with {actual_total} projections?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.Cancel,
)
if reply != QMessageBox.StandardButton.Yes:
return
ok = True
for key, val in params.items():
if key in QUEUE_PARAM_NAMES:
if not self._gv_set(key, val):
ok = False
if ok:
self._edit_mode = False
self._set_fields_enabled(False)
self._btn_edit.setVisible(True)
self._btn_submit.setVisible(False)
self._btn_cancel.setVisible(False)
else:
QMessageBox.critical(self, "Error", "One or more parameters could not be written.")
def _set_fields_enabled(self, enabled: bool) -> None:
skip = {"_requested_total"} if not enabled else set()
for key, widget in self._pw.items():
if key in skip:
continue
widget.setEnabled(enabled)
# stitch locked when single_point is active
if enabled and self._pw["single_point_instead_of_fermat_scan"].isChecked():
self._pw["stitch_x"].setEnabled(False)
self._pw["stitch_y"].setEnabled(False)
# projection requested total only meaningful in edit mode
self._pw["_requested_total"].setEnabled(enabled)
# ── queue dialog ──────────────────────────────────────────────────────────
def add_to_queue(self) -> None:
"""Open queue dialog and trigger Add (also accessible via RPC)."""
dlg = self._get_queue_dialog()
dlg.show()
dlg.raise_()
dlg.add_to_queue()
def _show_queue_dialog(self) -> None:
dlg = self._get_queue_dialog()
dlg.show()
dlg.raise_()
dlg.activateWindow()
def _get_queue_dialog(self) -> "TomoQueueDialog":
if self._queue_dlg is None:
self._queue_dlg = TomoQueueDialog(self.client, parent=self)
return self._queue_dlg
# ── cleanup ───────────────────────────────────────────────────────────────
def cleanup(self) -> None:
self._poll_timer.stop()
super().cleanup()
# ── TomoQueueDialog ───────────────────────────────────────────────────────────
class TomoQueueDialog(QDialog):
"""
Non-modal popup window for managing the FlOMNI tomo scan queue.
Opened from the main TomoParamsWidget via the "Queue control…" button.
Shares the BEC client with its parent for global-var access.
Polls the queue every 2 s to stay in sync with CLI-driven changes.
"""
_POLL_MS = 2000
def __init__(self, client, parent=None):
super().__init__(parent)
self._client = client
self.setWindowTitle("FlOMNI Tomo Queue Control")
self.setMinimumSize(720, 360)
self._build_ui()
self._poll_timer = QTimer(self)
self._poll_timer.setInterval(self._POLL_MS)
self._poll_timer.timeout.connect(self._refresh)
self._poll_timer.start()
self._refresh()
# ── global-var helpers ────────────────────────────────────────────────────
def _gv_get(self, key: str):
try:
return self._client.get_global_var(key)
except Exception as exc:
logger.warning(f"TomoQueueDialog: get_global_var({key!r}) failed: {exc}")
return None
def _gv_set(self, key: str, value) -> bool:
try:
self._client.set_global_var(key, value)
return True
except Exception as exc:
logger.warning(f"TomoQueueDialog: set_global_var({key!r}) failed: {exc}")
return False
def _load_queue(self) -> list[dict]:
val = self._gv_get("tomo_queue")
return val if isinstance(val, list) else []
def _save_queue(self, jobs: list[dict]) -> None:
self._gv_set("tomo_queue", jobs)
def _load_params(self) -> dict:
params = {}
for key in QUEUE_PARAM_NAMES:
val = self._gv_get(key)
params[key] = val if val is not None else DEFAULTS.get(key)
return params
# ── UI ────────────────────────────────────────────────────────────────────
def _build_ui(self):
vbox = QVBoxLayout(self)
vbox.setSpacing(6)
self._table = QTableWidget(0, 6)
self._table.setHorizontalHeaderLabels(
["#", "Label", "Status", "Type", "FOV x×y (µm)", "Added at"]
)
self._table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Stretch)
self._table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
self._table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
vbox.addWidget(self._table)
btn_row = QHBoxLayout()
btn_add = QPushButton("Add current params to queue")
btn_del = QPushButton("Delete selected")
btn_clr = QPushButton("Clear all")
btn_exe = QPushButton("Execute queue…")
btn_add.clicked.connect(self.add_to_queue)
btn_del.clicked.connect(self._delete_selected)
btn_clr.clicked.connect(self._clear_queue)
btn_exe.clicked.connect(self._show_execute_hint)
btn_row.addWidget(btn_add)
btn_row.addWidget(btn_del)
btn_row.addWidget(btn_clr)
btn_row.addStretch()
btn_row.addWidget(btn_exe)
vbox.addLayout(btn_row)
# ── refresh ───────────────────────────────────────────────────────────────
def _refresh(self) -> None:
jobs = self._load_queue()
self._table.setRowCount(len(jobs))
for row, job in enumerate(jobs):
params = job.get("params", {})
status = job.get("status", "pending")
color = STATUS_COLORS.get(status, "#888888")
cells = [
str(row),
job.get("label", f"job_{row}"),
status,
str(params.get("tomo_type", "?")),
f"{params.get('fovx', '?')} × {params.get('fovy', '?')}",
job.get("added_at", ""),
]
for col, text in enumerate(cells):
item = QTableWidgetItem(text)
if col == 2:
item.setForeground(_color_from_hex(color))
self._table.setItem(row, col, item)
# ── queue actions ─────────────────────────────────────────────────────────
def add_to_queue(self) -> None:
label, ok = QInputDialog.getText(self, "Add to queue", "Job label (leave blank for auto):")
if not ok:
return
params = self._load_params()
jobs = self._load_queue()
if not label.strip():
label = f"job_{len(jobs)}"
jobs.append(
{
"label": label.strip(),
"params": params,
"status": "pending",
"added_at": datetime.datetime.now().isoformat(timespec="seconds"),
}
)
self._save_queue(jobs)
self._refresh()
def _delete_selected(self) -> None:
rows = sorted({idx.row() for idx in self._table.selectedIndexes()}, reverse=True)
if not rows:
return
reply = QMessageBox.question(
self,
"Delete jobs",
f"Delete {len(rows)} selected job(s)?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.Cancel,
)
if reply != QMessageBox.StandardButton.Yes:
return
jobs = self._load_queue()
for row in rows:
if row < len(jobs):
jobs.pop(row)
self._save_queue(jobs)
self._refresh()
def _clear_queue(self) -> None:
reply = QMessageBox.question(
self,
"Clear queue",
"Remove all jobs from the queue?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.Cancel,
)
if reply != QMessageBox.StandardButton.Yes:
return
self._save_queue([])
self._refresh()
def _show_execute_hint(self) -> None:
jobs = self._load_queue()
pending = sum(1 for j in jobs if j.get("status") in ("pending", "incomplete", "running"))
QMessageBox.information(
self,
"Execute queue",
f"Queue: {len(jobs)} job(s), {pending} pending/resumable.\n\n"
"Execution is a blocking CLI operation — run from the BEC IPython session:\n\n"
" flomni.tomo_queue_execute()\n\n"
"Queue status updates here automatically once running.",
)
# ── cleanup ───────────────────────────────────────────────────────────────
def closeEvent(self, event):
self._poll_timer.stop()
super().closeEvent(event)
def showEvent(self, event):
self._poll_timer.start()
self._refresh()
super().showEvent(event)
# ── module-level helpers ─────────────────────────────────────────────────────
def _requested_to_stepsize(angle_range: int, requested_total: int) -> float:
"""Convert a desired total projection count to tomo_angle_stepsize."""
if requested_total <= 0:
return float(angle_range)
return (angle_range / requested_total) * 8
def _compute_type1(angle_range: int, stepsize: float) -> tuple[int, float, float]:
"""
Given stored tomo_angle_stepsize, return:
(actual_total, achievable_step, stepsize)
Mirrors the CLI's internal computation exactly.
"""
if stepsize <= 0:
return 0, 0.0, 0.0
N = int(angle_range / stepsize)
if N <= 0:
return 0, 0.0, 0.0
achievable_step = angle_range / N
actual_total = N * 8
return actual_total, achievable_step, stepsize
def _color_from_hex(hex_color: str):
from qtpy.QtGui import QColor
return QColor(hex_color)
@@ -0,0 +1,53 @@
from bec_widgets.utils.bec_designer import designer_material_icon
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from csaxs_bec.bec_widgets.widgets.tomo_params.tomo_params import TomoParamsWidget
DOM_XML = """
<ui language='c++'>
<widget class='TomoParamsWidget' name='tomo_params'>
</widget>
</ui>
"""
class TomoParamsPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
return TomoParamsWidget(parent)
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(TomoParamsWidget.ICON_NAME)
def includeFile(self):
return "tomo_params"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "TomoParamsWidget"
def toolTip(self):
return "TomoParamsWidget"
def whatsThis(self):
return self.toolTip()
@@ -0,0 +1 @@
{'files': ['tomo_params.py']}
@@ -0,0 +1,57 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from bec_widgets.utils.bec_designer import designer_material_icon
from csaxs_bec.bec_widgets.widgets.tomo_params.tomo_params import TomoParamsWidget
DOM_XML = """
<ui language='c++'>
<widget class='TomoParamsWidget' name='tomo_params_widget'>
</widget>
</ui>
"""
class TomoParamsWidgetPlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
t = TomoParamsWidget(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(TomoParamsWidget.ICON_NAME)
def includeFile(self):
return "tomo_params_widget"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "TomoParamsWidget"
def toolTip(self):
return ""
def whatsThis(self):
return self.toolTip()