1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-03-30 13:28:07 +02:00

feat(waveform): 1D alignment mode panel

This commit is contained in:
2026-03-19 14:44:30 +01:00
committed by Jan Wyzula
parent 31389a3dd0
commit a486c52058
6 changed files with 1226 additions and 4 deletions

View File

@@ -0,0 +1,342 @@
from __future__ import annotations
from dataclasses import dataclass
import pyqtgraph as pg
from qtpy.QtCore import QObject, Qt, Signal
from qtpy.QtGui import QColor
from bec_widgets.utils.colors import get_accent_colors, get_theme_name
from bec_widgets.utils.error_popups import SafeSlot
from bec_widgets.widgets.plots.waveform.utils.alignment_panel import WaveformAlignmentPanel
@dataclass(slots=True)
class AlignmentContext:
"""Alignment state produced by `Waveform` and consumed by the controller.
Attributes:
visible: Whether alignment mode is currently visible.
positioner_name: Name of the resolved x-axis positioner, if available.
precision: Decimal precision to use for readback and target labels.
limits: Optional positioner limits for the draggable target line.
readback: Current cached positioner readback value.
has_dap_curves: Whether the waveform currently contains any DAP curves.
force_readback: Whether the embedded positioner should refresh its readback immediately.
"""
visible: bool
positioner_name: str | None
precision: int = 3
limits: tuple[float, float] | None = None
readback: float | None = None
has_dap_curves: bool = False
force_readback: bool = False
class WaveformAlignmentController(QObject):
"""Own the alignment plot overlays and synchronize them with the alignment panel."""
move_absolute_requested = Signal(float)
def __init__(self, plot_item: pg.PlotItem, panel: WaveformAlignmentPanel, parent=None):
super().__init__(parent=parent)
self._plot_item = plot_item
self._panel = panel
self._visible = False
self._positioner_name: str | None = None
self._precision = 3
self._limits: tuple[float, float] | None = None
self._readback: float | None = None
self._marker_line: pg.InfiniteLine | None = None
self._target_line: pg.InfiniteLine | None = None
self._panel.position_readback_changed.connect(self.update_position)
self._panel.target_toggled.connect(self._on_target_toggled)
self._panel.target_move_requested.connect(self._on_target_move_requested)
self._panel.fit_selection_changed.connect(self._on_fit_selection_changed)
self._panel.fit_center_requested.connect(self._on_fit_center_requested)
@property
def marker_line(self) -> pg.InfiniteLine | None:
"""Return the current-position indicator line, if it exists."""
return self._marker_line
@property
def target_line(self) -> pg.InfiniteLine | None:
"""Return the draggable target indicator line, if it exists."""
return self._target_line
def update_context(self, context: AlignmentContext):
"""Apply waveform-owned alignment context to the panel and plot overlays.
Args:
context: Snapshot of the current alignment-relevant waveform/device state.
"""
previous_name = self._positioner_name
self._visible = context.visible
self._positioner_name = context.positioner_name
self._precision = context.precision
self._limits = context.limits
self._readback = context.readback
self._panel.set_positioner_device(context.positioner_name)
self._panel.set_positioner_enabled(context.visible and context.positioner_name is not None)
self._panel.set_status_message(self._status_message_for_context(context))
if context.positioner_name is None or not context.visible:
self.clear()
self._refresh_fit_actions()
self._refresh_target_controls()
return
if previous_name != context.positioner_name:
self._clear_marker()
if self._panel.target_active:
self._clear_target_line()
if context.readback is not None:
self.update_position(context.readback)
if self._panel.target_active:
if previous_name != context.positioner_name or self._target_line is None:
self._show_target_line()
else:
self._refresh_target_line_metadata()
self._on_target_line_changed()
if context.force_readback or previous_name != context.positioner_name:
self._panel.force_positioner_readback()
self._refresh_fit_actions()
self._refresh_target_controls()
@SafeSlot(float)
def update_position(self, position: float):
"""Update the live position marker from a positioner readback value.
Args:
position: Current absolute position of the active alignment positioner.
"""
self._readback = float(position)
if not self._visible or self._positioner_name is None:
self._clear_marker()
return
self._ensure_marker()
self._marker_line.setValue(self._readback)
self._marker_line.label.setText(
f"{self._positioner_name}: {self._readback:.{self._precision}f}"
)
@SafeSlot(dict, dict)
def update_dap_summary(self, data: dict, metadata: dict):
"""Forward DAP summary updates into the alignment fit panel.
Args:
data: DAP fit summary payload.
metadata: Metadata describing the emitting DAP curve.
"""
self._panel.update_dap_summary(data, metadata)
self._refresh_fit_actions()
@SafeSlot(str)
def remove_dap_curve(self, curve_id: str):
"""Remove a deleted DAP curve from the alignment fit selection state.
Args:
curve_id: Label of the DAP curve that was removed from the waveform.
"""
self._panel.remove_dap_curve(curve_id)
self._panel.clear_fit_selection_if_missing()
self._refresh_fit_actions()
def clear(self):
"""Remove alignment overlay items from the plot and reset target state."""
self._clear_marker()
self._clear_target_line()
def cleanup(self):
"""Disconnect panel signals and remove all controller-owned overlay items."""
self.clear()
self._disconnect_panel_signals()
def refresh_theme_colors(self):
"""Reapply theme-aware styling to any existing alignment overlay items."""
self._apply_marker_style()
self._apply_target_style()
def _disconnect_panel_signals(self):
signal_pairs = [
(self._panel.position_readback_changed, self.update_position),
(self._panel.target_toggled, self._on_target_toggled),
(self._panel.target_move_requested, self._on_target_move_requested),
(self._panel.fit_selection_changed, self._on_fit_selection_changed),
(self._panel.fit_center_requested, self._on_fit_center_requested),
]
for signal, slot in signal_pairs:
try:
signal.disconnect(slot)
except (RuntimeError, TypeError):
continue
def _selected_fit_has_center(self) -> bool:
data = self._panel.selected_fit_summary()
params = data.get("params", []) if isinstance(data, dict) else []
return any(param[0] == "center" for param in params if param)
@staticmethod
def _status_message_for_context(context: AlignmentContext) -> str | None:
if context.positioner_name is None:
return "Alignment mode requires a positioner on the x axis."
if not context.has_dap_curves:
return "Add a DAP curve in Curve Settings to enable alignment fitting."
return None
def _refresh_fit_actions(self):
self._panel.set_fit_actions_enabled(
self._visible and self._positioner_name is not None and self._selected_fit_has_center()
)
def _refresh_target_controls(self):
has_positioner = self._visible and self._positioner_name is not None
self._panel.set_target_enabled(has_positioner)
self._panel.set_target_move_enabled(has_positioner and self._target_line is not None)
if self._target_line is None:
self._panel.set_target_value(None)
def _ensure_marker(self):
if self._marker_line is not None:
return
warning = get_accent_colors().warning
self._marker_line = pg.InfiniteLine(
angle=90,
movable=False,
pen=pg.mkPen(warning, width=4),
label="",
labelOpts={"position": 0.95, "color": warning},
)
self._marker_line.skip_auto_range = True
self._apply_marker_style()
self._plot_item.addItem(self._marker_line)
def _clear_marker(self):
if self._marker_line is None:
return
self._plot_item.removeItem(self._marker_line)
self._marker_line = None
def _show_target_line(self):
if not self._visible or self._positioner_name is None:
return
if self._target_line is None:
accent_colors = get_accent_colors()
label = f"{self._positioner_name} target={{value:0.{self._precision}f}}"
self._target_line = pg.InfiniteLine(
movable=True,
angle=90,
pen=pg.mkPen(accent_colors.default, width=2, style=Qt.PenStyle.DashLine),
hoverPen=pg.mkPen(accent_colors.success, width=2),
label=label,
labelOpts={"movable": True, "color": accent_colors.default},
)
self._target_line.sigPositionChanged.connect(self._on_target_line_changed)
self._apply_target_style()
self._plot_item.addItem(self._target_line)
self._refresh_target_line_metadata()
value = 0.0 if self._readback is None else self._readback
if self._limits is not None:
value = min(max(value, self._limits[0]), self._limits[1])
self._target_line.setValue(value)
self._on_target_line_changed()
def _refresh_target_line_metadata(self):
if self._target_line is None or self._positioner_name is None:
return
self._apply_target_style()
self._target_line.label.setFormat(
f"{self._positioner_name} target={{value:0.{self._precision}f}}"
)
if self._limits is not None:
self._target_line.setBounds(list(self._limits))
else:
self._target_line.setBounds((None, None))
if self._limits is not None:
current_value = float(self._target_line.value())
clamped_value = min(max(current_value, self._limits[0]), self._limits[1])
if clamped_value != current_value:
self._target_line.setValue(clamped_value)
def _clear_target_line(self):
if self._target_line is not None:
try:
self._target_line.sigPositionChanged.disconnect(self._on_target_line_changed)
except (RuntimeError, TypeError):
pass
self._plot_item.removeItem(self._target_line)
self._target_line = None
self._panel.set_target_value(None)
def _apply_marker_style(self):
if self._marker_line is None:
return
accent_colors = get_accent_colors()
warning = accent_colors.warning
self._marker_line.setPen(pg.mkPen(warning, width=4))
self._marker_line.label.setColor(warning)
self._marker_line.label.fill = pg.mkBrush(self._label_fill_color())
def _apply_target_style(self):
if self._target_line is None:
return
accent_colors = get_accent_colors()
default = accent_colors.default
success = accent_colors.success
self._target_line.setPen(pg.mkPen(default, width=2, style=Qt.PenStyle.DashLine))
self._target_line.setHoverPen(pg.mkPen(success, width=2))
self._target_line.label.setColor(default)
self._target_line.label.fill = pg.mkBrush(self._label_fill_color())
@staticmethod
def _label_fill_color() -> QColor:
if get_theme_name() == "light":
return QColor(244, 244, 244, 228)
return QColor(48, 48, 48, 210)
@SafeSlot(bool)
def _on_target_toggled(self, checked: bool):
if checked:
self._show_target_line()
else:
self._clear_target_line()
self._refresh_target_controls()
@SafeSlot(object)
def _on_target_line_changed(self, _line=None):
if self._target_line is None:
return
self._panel.set_target_value(float(self._target_line.value()), precision=self._precision)
self._refresh_target_controls()
@SafeSlot()
def _on_target_move_requested(self):
if self._visible and self._positioner_name is not None and self._target_line is not None:
self.move_absolute_requested.emit(float(self._target_line.value()))
@SafeSlot(str)
def _on_fit_selection_changed(self, _curve_id: str):
self._refresh_fit_actions()
@SafeSlot(float)
def _on_fit_center_requested(self, value: float):
if self._visible and self._positioner_name is not None:
self.move_absolute_requested.emit(float(value))

View File

@@ -0,0 +1,285 @@
from __future__ import annotations
from qtpy.QtCore import Qt, Signal
from qtpy.QtGui import QColor
from qtpy.QtWidgets import (
QCheckBox,
QGridLayout,
QGroupBox,
QHBoxLayout,
QLabel,
QPushButton,
QSizePolicy,
QWidget,
)
from bec_widgets.utils.colors import get_accent_colors, get_theme_name
from bec_widgets.widgets.control.device_control.positioner_box.positioner_control_line.positioner_control_line import (
PositionerControlLine,
)
from bec_widgets.widgets.dap.lmfit_dialog.lmfit_dialog import LMFitDialog
class WaveformAlignmentPanel(QWidget):
"""Compact bottom panel used by Waveform alignment mode."""
position_readback_changed = Signal(float)
target_toggled = Signal(bool)
target_move_requested = Signal()
fit_selection_changed = Signal(str)
fit_center_requested = Signal(float)
def __init__(self, parent=None, client=None, gui_id: str | None = None, **kwargs):
super().__init__(parent=parent, **kwargs)
self.setProperty("skip_settings", True)
self.positioner = PositionerControlLine(parent=self, client=client, gui_id=gui_id)
self.positioner.hide_device_selection = True
self.fit_dialog = LMFitDialog(
parent=self, client=client, gui_id=gui_id, ui_file="lmfit_dialog_compact.ui"
)
self.fit_dialog.active_action_list = ["center"]
self.fit_dialog.enable_actions = False
self.target_toggle = QCheckBox("Target: --", parent=self)
self.move_to_target_button = QPushButton("Move To Target", parent=self)
self.move_to_target_button.setEnabled(False)
self.target_group = QGroupBox("Target Position", parent=self)
self.status_label = QLabel(parent=self)
self.status_label.setWordWrap(False)
self.status_label.setAlignment(
Qt.AlignmentFlag.AlignHCenter | Qt.AlignmentFlag.AlignVCenter
)
self.status_label.setSizePolicy(QSizePolicy.Policy.Maximum, QSizePolicy.Policy.Fixed)
self.status_label.setMaximumHeight(28)
self.status_label.setVisible(False)
self._init_ui()
self.fit_dialog.setMinimumHeight(0)
self.target_group.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed)
self._sync_target_group_size()
self.refresh_theme_colors()
self._connect_signals()
def _connect_signals(self):
self.positioner.position_update.connect(self.position_readback_changed)
self.target_toggle.toggled.connect(self.target_toggled)
self.move_to_target_button.clicked.connect(self.target_move_requested)
self.fit_dialog.selected_fit.connect(self.fit_selection_changed)
self.fit_dialog.move_action.connect(self._forward_fit_move_action)
def _init_ui(self):
self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Preferred)
self.setMinimumHeight(260)
root = QGridLayout(self)
root.setContentsMargins(8, 8, 8, 8)
root.setSpacing(8)
self.fit_dialog.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Preferred)
root.addWidget(
self.status_label,
0,
0,
1,
2,
alignment=Qt.AlignmentFlag.AlignHCenter | Qt.AlignmentFlag.AlignVCenter,
)
root.addWidget(self.fit_dialog, 1, 0, 1, 2)
target_layout = QHBoxLayout(self.target_group)
target_layout.addWidget(self.target_toggle)
target_layout.addWidget(self.move_to_target_button)
root.addWidget(self.positioner, 2, 0, alignment=Qt.AlignmentFlag.AlignTop)
root.addWidget(self.target_group, 2, 1, alignment=Qt.AlignmentFlag.AlignTop)
root.setColumnStretch(0, 1)
root.setColumnStretch(1, 0)
root.setRowStretch(1, 1)
def _sync_target_group_size(self):
representative_text = "Target: -99999.999"
label_width = max(
self.target_toggle.sizeHint().width(),
self.target_toggle.fontMetrics().horizontalAdvance(representative_text) + 24,
)
self.target_toggle.setMinimumWidth(label_width)
# To make those two box the same height
target_height = max(
self.positioner.height(),
self.positioner.ui.device_box.minimumSizeHint().height(),
self.positioner.ui.device_box.sizeHint().height(),
)
self.target_group.setFixedHeight(target_height)
self.target_group.setFixedWidth(self.target_group.sizeHint().width() + 16)
def showEvent(self, event):
super().showEvent(event)
self._sync_target_group_size()
def resizeEvent(self, event):
super().resizeEvent(event)
self._sync_target_group_size()
def set_status_message(self, text: str | None):
"""Show or hide the alignment status pill.
Args:
text: Message to display. Pass `None` or an empty string to hide the pill.
"""
text = text or ""
self.status_label.setText(text)
self.status_label.setVisible(bool(text))
@staticmethod
def _qcolor_to_rgba(color: QColor, alpha: int | None = None) -> str:
if alpha is not None:
color = QColor(color)
color.setAlpha(alpha)
return f"rgba({color.red()}, {color.green()}, {color.blue()}, {color.alpha()})"
def refresh_theme_colors(self):
"""Apply theme-aware accent styling to the status pill."""
warning = get_accent_colors().warning
is_light = get_theme_name() == "light"
text_color = "#202124" if is_light else warning.name()
fill_alpha = 72 if is_light else 48
border_alpha = 220 if is_light else 160
self.status_label.setStyleSheet(f"""
QLabel {{
background-color: {self._qcolor_to_rgba(warning, fill_alpha)};
border: 1px solid {self._qcolor_to_rgba(warning, border_alpha)};
border-radius: 12px;
padding: 4px 10px;
color: {text_color};
}}
""")
def set_positioner_device(self, device: str | None):
"""Bind the embedded positioner control to a fixed device.
Args:
device: Name of the positioner device to display, or `None` to clear it.
"""
if device is None:
self.positioner.ui.device_box.setTitle("No positioner selected")
return
if self.positioner.device != device:
self.positioner.set_positioner(device)
self.positioner.hide_device_selection = True
def set_positioner_enabled(self, enabled: bool):
"""Enable or disable the embedded positioner widget.
Args:
enabled: Whether the positioner widget should accept interaction.
"""
self.positioner.setEnabled(enabled)
def force_positioner_readback(self):
"""Trigger an immediate readback refresh on the embedded positioner widget."""
self.positioner.force_update_readback()
def set_target_enabled(self, enabled: bool):
"""Enable or disable the target-line toggle.
Args:
enabled: Whether the target toggle should accept interaction.
"""
self.target_toggle.setEnabled(enabled)
def set_target_move_enabled(self, enabled: bool):
"""Enable or disable the move-to-target button.
Args:
enabled: Whether the move button should accept interaction.
"""
self.move_to_target_button.setEnabled(enabled)
def set_target_active(self, active: bool):
"""Programmatically toggle the draggable target-line state.
Args:
active: Whether the target line should be considered active.
"""
blocker = self.target_toggle.blockSignals(True)
self.target_toggle.setChecked(active)
self.target_toggle.blockSignals(blocker)
if not active:
self.set_target_value(None)
def set_target_value(self, value: float | None, precision: int = 3) -> None:
"""
Update the target checkbox label for the draggable target line.
Args:
value(float | None): The target value to display. If None, the label will show "--".
precision(int): The number of decimal places to display for the target value.
"""
if value is None or not self.target_toggle.isChecked():
self.target_toggle.setText("Target: --")
return
self.target_toggle.setText(f"Target: {value:.{precision}f}")
def set_fit_actions_enabled(self, enabled: bool):
"""Enable or disable LMFit action buttons in the embedded fit dialog.
Args:
enabled: Whether fit action buttons should be enabled.
"""
self.fit_dialog.enable_actions = enabled
def update_dap_summary(self, data: dict, metadata: dict):
"""Forward a DAP summary update into the embedded fit dialog.
Args:
data: DAP fit summary payload.
metadata: Metadata describing the emitting DAP curve.
"""
self.fit_dialog.update_summary_tree(data, metadata)
def remove_dap_curve(self, curve_id: str):
"""Remove DAP summary state for a deleted fit curve.
Args:
curve_id: Label of the DAP curve that should be removed.
"""
self.fit_dialog.remove_dap_data(curve_id)
def clear_fit_selection_if_missing(self):
"""Select a remaining fit curve if the current selection no longer exists."""
fit_curve_id = self.fit_dialog.fit_curve_id
if fit_curve_id is not None and fit_curve_id not in self.fit_dialog.summary_data:
remaining = list(self.fit_dialog.summary_data)
self.fit_dialog.fit_curve_id = remaining[0] if remaining else None
@property
def target_active(self) -> bool:
"""Whether the target-line checkbox is currently checked."""
return self.target_toggle.isChecked()
@property
def selected_fit_curve_id(self) -> str | None:
"""Return the currently selected fit curve label, if any."""
return self.fit_dialog.fit_curve_id
def selected_fit_summary(self) -> dict | None:
"""Return the summary payload for the currently selected fit curve.
Returns:
The selected fit summary, or `None` if no fit curve is selected.
"""
fit_curve_id = self.selected_fit_curve_id
if fit_curve_id is None:
return None
return self.fit_dialog.summary_data.get(fit_curve_id)
def _forward_fit_move_action(self, action: tuple[str, float]):
param_name, param_value = action
if param_name == "center":
self.fit_center_requested.emit(float(param_value))

View File

@@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, Literal
import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger, messages
from bec_lib.device import Positioner
from bec_lib.endpoints import MessageEndpoints
from bec_lib.lmfit_serializer import serialize_lmfit_params, serialize_param_object
from bec_lib.scan_data_container import ScanDataContainer
@@ -30,11 +31,18 @@ from bec_widgets.utils.colors import Colors, apply_theme
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.settings_dialog import SettingsDialog
from bec_widgets.utils.side_panel import SidePanel
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import MaterialIconAction
from bec_widgets.widgets.dap.lmfit_dialog.lmfit_dialog import LMFitDialog
from bec_widgets.widgets.plots.plot_base import PlotBase
from bec_widgets.widgets.plots.waveform.curve import Curve, CurveConfig, DeviceSignal
from bec_widgets.widgets.plots.waveform.settings.curve_settings.curve_setting import CurveSetting
from bec_widgets.widgets.plots.waveform.utils.alignment_controller import (
AlignmentContext,
WaveformAlignmentController,
)
from bec_widgets.widgets.plots.waveform.utils.alignment_panel import WaveformAlignmentPanel
from bec_widgets.widgets.plots.waveform.utils.roi_manager import WaveformROIManager
from bec_widgets.widgets.services.scan_history_browser.scan_history_browser import (
ScanHistoryBrowser,
@@ -156,6 +164,12 @@ class Waveform(PlotBase):
"label_suffix": "",
}
self._current_x_device: tuple[str, str] | None = None
self._alignment_panel_visible = False
self._alignment_side_panel: SidePanel | None = None
self._alignment_panel_index: int | None = None
self._alignment_panel: WaveformAlignmentPanel | None = None
self._alignment_controller: WaveformAlignmentController | None = None
self._alignment_positioner_name: str | None = None
# Specific GUI elements
self._init_roi_manager()
@@ -165,6 +179,7 @@ class Waveform(PlotBase):
self._add_waveform_specific_popup()
self._enable_roi_toolbar_action(False) # default state where are no dap curves
self._init_curve_dialog()
self._init_alignment_mode()
self.curve_settings_dialog = None
# Largedataset guard
@@ -195,7 +210,9 @@ class Waveform(PlotBase):
# To fix the ViewAll action with clipToView activated
self._connect_viewbox_menu_actions()
self.toolbar.show_bundles(["plot_export", "mouse_interaction", "roi", "axis_popup"])
self.toolbar.show_bundles(
["plot_export", "mouse_interaction", "roi", "alignment_mode", "axis_popup"]
)
def _connect_viewbox_menu_actions(self):
"""Connect the viewbox menu action ViewAll to the custom reset_view method."""
@@ -221,6 +238,12 @@ class Waveform(PlotBase):
theme(str, optional): The theme to be applied.
"""
self._refresh_colors()
alignment_panel = getattr(self, "_alignment_panel", None)
alignment_controller = getattr(self, "_alignment_controller", None)
if alignment_panel is not None:
alignment_panel.refresh_theme_colors()
if alignment_controller is not None:
alignment_controller.refresh_theme_colors()
super().apply_theme(theme)
def add_side_menus(self):
@@ -230,6 +253,153 @@ class Waveform(PlotBase):
super().add_side_menus()
self._add_dap_summary_side_menu()
def _init_alignment_mode(self):
"""
Initialize the top alignment panel.
"""
self.toolbar.components.add_safe(
"alignment_mode",
MaterialIconAction(
icon_name="align_horizontal_center",
tooltip="Show Alignment Mode",
checkable=True,
parent=self,
),
)
bundle = ToolbarBundle("alignment_mode", self.toolbar.components)
bundle.add_action("alignment_mode")
self.toolbar.add_bundle(bundle)
shown_bundles = list(self.toolbar.shown_bundles)
if "alignment_mode" not in shown_bundles:
shown_bundles.append("alignment_mode")
self.toolbar.show_bundles(shown_bundles)
self._alignment_side_panel = SidePanel(
parent=self, orientation="top", panel_max_width=320, show_toolbar=False
)
self.layout_manager.add_widget_relative(
self._alignment_side_panel,
self.round_plot_widget,
position="top",
shift_direction="down",
)
self._alignment_panel = WaveformAlignmentPanel(parent=self, client=self.client)
self._alignment_controller = WaveformAlignmentController(
self.plot_item, self._alignment_panel, parent=self
)
self._alignment_panel_index = self._alignment_side_panel.add_menu(
widget=self._alignment_panel
)
self._alignment_controller.move_absolute_requested.connect(self._move_alignment_positioner)
self.dap_summary_update.connect(self._alignment_controller.update_dap_summary)
self.toolbar.components.get_action("alignment_mode").action.toggled.connect(
self.toggle_alignment_mode
)
self._refresh_alignment_state()
@SafeSlot(bool)
def toggle_alignment_mode(self, checked: bool):
"""
Show or hide the alignment panel.
Args:
checked(bool): Whether the panel should be visible.
"""
if self._alignment_side_panel is None or self._alignment_panel_index is None:
return
self._alignment_panel_visible = checked
if checked:
self._alignment_side_panel.show_panel(self._alignment_panel_index)
self._refresh_alignment_state(force_readback=True)
self._refresh_dap_signals()
else:
self._alignment_side_panel.hide_panel()
self._refresh_alignment_state()
def _refresh_alignment_state(self, force_readback: bool = False):
"""
Refresh the alignment panel state after waveform changes.
Args:
force_readback(bool): Force a positioner readback refresh.
"""
if self._alignment_controller is None:
return
context = self._build_alignment_context(force_readback=force_readback)
self._alignment_positioner_name = context.positioner_name
self._alignment_controller.update_context(context)
def _resolve_alignment_positioner(self) -> str | None:
"""
Resolve the active x-axis positioner for alignment mode.
"""
if self.x_axis_mode["name"] in {"index", "timestamp"}:
return None
if self.x_axis_mode["name"] == "auto":
device_name = self._current_x_device[0] if self._current_x_device is not None else None
else:
device_name = self.x_axis_mode["name"]
if not device_name or device_name not in self.dev:
return None
if not isinstance(self.dev[device_name], Positioner):
return None
return device_name
def _build_alignment_context(self, force_readback: bool = False) -> AlignmentContext:
"""Build controller-facing alignment context from waveform/device state."""
positioner_name = self._resolve_alignment_positioner()
if positioner_name is None:
return AlignmentContext(
visible=self._alignment_panel_visible,
positioner_name=None,
has_dap_curves=bool(self._dap_curves),
force_readback=force_readback,
)
precision = getattr(self.dev[positioner_name], "precision", 3)
try:
precision = int(precision)
except (TypeError, ValueError):
precision = 3
limits = getattr(self.dev[positioner_name], "limits", None)
parsed_limits: tuple[float, float] | None = None
if limits is not None and len(limits) == 2:
low, high = float(limits[0]), float(limits[1])
if low != 0 or high != 0:
if low > high:
low, high = high, low
parsed_limits = (low, high)
data = self.dev[positioner_name].read(cached=True)
value = data.get(positioner_name, {}).get("value")
readback = None if value is None else float(value)
return AlignmentContext(
visible=self._alignment_panel_visible,
positioner_name=positioner_name,
precision=precision,
limits=parsed_limits,
readback=readback,
has_dap_curves=bool(self._dap_curves),
force_readback=force_readback,
)
@SafeSlot(float)
def _move_alignment_positioner(self, value: float):
"""
Move the active alignment positioner to an absolute value requested by the controller.
"""
if self._alignment_positioner_name is None:
return
self.dev[self._alignment_positioner_name].move(float(value), relative=False)
def _add_waveform_specific_popup(self):
"""
Add popups to the Waveform widget.
@@ -266,7 +436,7 @@ class Waveform(PlotBase):
Due to setting clipToView to True on the curves, the autoRange() method
of the ViewBox does no longer work as expected. This method deactivates the
setClipToView for all curves, calls autoRange() to circumvent that issue.
Afterwards, it re-enables the setClipToView for all curves again.
Afterward, it re-enables the setClipToView for all curves again.
It is hooked to the ViewAll action in the right-click menu of the pg.PlotItem ViewBox.
"""
@@ -544,6 +714,7 @@ class Waveform(PlotBase):
self.sync_signal_update.emit()
self.plot_item.enableAutoRange(x=True)
self.round_plot_widget.apply_plot_widget_style() # To keep the correct theme
self._refresh_alignment_state(force_readback=True)
@SafeProperty(str)
def signal_x(self) -> str | None:
@@ -573,6 +744,7 @@ class Waveform(PlotBase):
self.sync_signal_update.emit()
self.plot_item.enableAutoRange(x=True)
self.round_plot_widget.apply_plot_widget_style()
self._refresh_alignment_state(force_readback=True)
@SafeProperty(str)
def color_palette(self) -> str:
@@ -627,6 +799,8 @@ class Waveform(PlotBase):
continue
config = CurveConfig(**cfg_dict)
self._add_curve(config=config)
self._refresh_alignment_state(force_readback=self._alignment_panel_visible)
self._refresh_dap_signals()
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}")
@@ -1002,6 +1176,7 @@ class Waveform(PlotBase):
QTimer.singleShot(
150, self.auto_range
) # autorange with a delay to ensure the plot is updated
self._refresh_alignment_state()
return curve
@@ -1257,6 +1432,7 @@ class Waveform(PlotBase):
self.remove_curve(curve.name())
if self.crosshair is not None:
self.crosshair.clear_markers()
self._refresh_alignment_state()
def get_curve(self, curve: int | str) -> Curve | None:
"""
@@ -1292,6 +1468,7 @@ class Waveform(PlotBase):
self._refresh_colors()
self._categorise_device_curves()
self._refresh_alignment_state()
def _remove_curve_by_name(self, name: str):
"""
@@ -1342,6 +1519,8 @@ class Waveform(PlotBase):
and self.enable_side_panel is True
):
self.dap_summary.remove_dap_data(curve.name())
if curve.config.source == "dap" and self._alignment_controller is not None:
self._alignment_controller.remove_dap_curve(curve.name())
# find a corresponding dap curve and remove it
for c in self.curves:
@@ -1983,6 +2162,7 @@ class Waveform(PlotBase):
"""
x_data = None
new_suffix = None
previous_x_device = self._current_x_device
data, access_key = self._fetch_scan_data_and_access()
# 1 User wants custom signal
@@ -2041,6 +2221,7 @@ class Waveform(PlotBase):
if not scan_report_devices:
x_data = None
new_suffix = " (auto: index)"
self._current_x_device = None
else:
device_x = scan_report_devices[0]
signal_x = self.entry_validator.validate_signal(device_x, None)
@@ -2050,8 +2231,10 @@ class Waveform(PlotBase):
entry_obj = data.get(device_x, {}).get(signal_x)
x_data = entry_obj.read()["value"] if entry_obj else None
new_suffix = f" (auto: {device_x}-{signal_x})"
self._current_x_device = (device_x, signal_x)
self._current_x_device = (device_x, signal_x)
self._update_x_label_suffix(new_suffix)
if previous_x_device != self._current_x_device:
self._refresh_alignment_state(force_readback=True)
return x_data
def _update_x_label_suffix(self, new_suffix: str):
@@ -2096,7 +2279,7 @@ class Waveform(PlotBase):
def _categorise_device_curves(self) -> str:
"""
Categorise the device curves into sync and async based on the readout priority.
Categorize the device curves into sync and async based on the readout priority.
"""
if self.scan_item is None:
self.update_with_scan_history(-1)
@@ -2453,6 +2636,8 @@ class Waveform(PlotBase):
Cleanup the widget by disconnecting signals and closing dialogs.
"""
self.proxy_dap_request.cleanup()
if self._alignment_controller is not None:
self._alignment_controller.cleanup()
self.clear_all()
if self.curve_settings_dialog is not None:
self.curve_settings_dialog.reject()

View File

@@ -0,0 +1,142 @@
from unittest.mock import MagicMock
import numpy as np
import pyqtgraph as pg
from bec_widgets.widgets.plots.waveform.utils.alignment_controller import (
AlignmentContext,
WaveformAlignmentController,
)
from bec_widgets.widgets.plots.waveform.utils.alignment_panel import WaveformAlignmentPanel
from .client_mocks import mocked_client
from .conftest import create_widget
from .test_waveform import make_alignment_fit_summary
def create_alignment_controller(qtbot, mocked_client):
plot_widget = pg.PlotWidget()
qtbot.addWidget(plot_widget)
panel = create_widget(qtbot, WaveformAlignmentPanel, client=mocked_client)
controller = WaveformAlignmentController(plot_widget.plotItem, panel, parent=plot_widget)
return plot_widget, panel, controller
def test_alignment_controller_shows_marker_only_when_visible(qtbot, mocked_client):
_, panel, controller = create_alignment_controller(qtbot, mocked_client)
controller.update_context(
AlignmentContext(visible=False, positioner_name="samx", precision=3, readback=1.0)
)
controller.update_position(4.2)
assert controller.marker_line is None
controller.update_context(
AlignmentContext(visible=True, positioner_name="samx", precision=3, readback=4.2)
)
assert controller.marker_line is not None
assert np.isclose(controller.marker_line.value(), 4.2)
assert panel.target_toggle.isEnabled() is True
controller.update_context(AlignmentContext(visible=False, positioner_name="samx"))
assert controller.marker_line is None
def test_alignment_controller_target_line_uses_readback_and_limits(qtbot, mocked_client):
_, panel, controller = create_alignment_controller(qtbot, mocked_client)
controller.update_context(
AlignmentContext(
visible=True, positioner_name="samx", precision=3, limits=(0.0, 2.0), readback=5.0
)
)
panel.target_toggle.setChecked(True)
assert controller.target_line is not None
assert np.isclose(controller.target_line.value(), 2.0)
panel.target_toggle.setChecked(False)
assert controller.target_line is None
controller.update_context(
AlignmentContext(
visible=True, positioner_name="samx", precision=3, limits=None, readback=5.0
)
)
panel.target_toggle.setChecked(True)
assert controller.target_line is not None
assert np.isclose(controller.target_line.value(), 5.0)
def test_alignment_controller_preserves_dragged_target_on_context_refresh(qtbot, mocked_client):
_, panel, controller = create_alignment_controller(qtbot, mocked_client)
controller.update_context(
AlignmentContext(
visible=True, positioner_name="samx", precision=3, limits=(0.0, 5.0), readback=1.0
)
)
panel.target_toggle.setChecked(True)
controller.target_line.setValue(3.0)
controller.update_context(
AlignmentContext(
visible=True, positioner_name="samx", precision=3, limits=(0.0, 5.0), readback=2.0
)
)
assert controller.target_line is not None
assert np.isclose(controller.target_line.value(), 3.0)
def test_alignment_controller_emits_move_request_for_fit_center(qtbot, mocked_client):
_, panel, controller = create_alignment_controller(qtbot, mocked_client)
move_callback = MagicMock()
controller.move_absolute_requested.connect(move_callback)
controller.update_context(
AlignmentContext(visible=True, positioner_name="samx", precision=3, readback=1.0)
)
controller.update_dap_summary(make_alignment_fit_summary(center=2.5), {"curve_id": "fit"})
assert panel.fit_dialog.action_buttons["center"].isEnabled() is True
panel.fit_dialog.action_buttons["center"].click()
move_callback.assert_called_once_with(2.5)
def test_alignment_controller_emits_move_request_for_target(qtbot, mocked_client):
_, panel, controller = create_alignment_controller(qtbot, mocked_client)
move_callback = MagicMock()
controller.move_absolute_requested.connect(move_callback)
controller.update_context(
AlignmentContext(
visible=True, positioner_name="samx", precision=3, limits=(0.0, 5.0), readback=1.0
)
)
panel.target_toggle.setChecked(True)
controller.target_line.setValue(1.25)
panel.move_to_target_button.click()
move_callback.assert_called_once_with(1.25)
def test_alignment_controller_removes_deleted_dap_curve(qtbot, mocked_client):
_, panel, controller = create_alignment_controller(qtbot, mocked_client)
controller.update_context(
AlignmentContext(visible=True, positioner_name="samx", precision=3, readback=1.0)
)
controller.update_dap_summary(make_alignment_fit_summary(center=1.5), {"curve_id": "fit"})
assert "fit" in panel.fit_dialog.summary_data
controller.remove_dap_curve("fit")
assert "fit" not in panel.fit_dialog.summary_data
assert panel.fit_dialog.fit_curve_id is None
assert panel.fit_dialog.enable_actions is False

View File

@@ -0,0 +1,40 @@
from unittest.mock import MagicMock
from bec_widgets.widgets.plots.waveform.utils.alignment_panel import WaveformAlignmentPanel
from .client_mocks import mocked_client
from .conftest import create_widget
def test_alignment_panel_forwards_position_and_target_signals(qtbot, mocked_client):
panel = create_widget(qtbot, WaveformAlignmentPanel, client=mocked_client)
position_callback = MagicMock()
target_toggle_callback = MagicMock()
target_move_callback = MagicMock()
panel.position_readback_changed.connect(position_callback)
panel.target_toggled.connect(target_toggle_callback)
panel.target_move_requested.connect(target_move_callback)
panel.positioner.position_update.emit(1.25)
panel.target_toggle.setChecked(True)
panel.move_to_target_button.setEnabled(True)
panel.move_to_target_button.click()
position_callback.assert_called_once_with(1.25)
target_toggle_callback.assert_called_once_with(True)
target_move_callback.assert_called_once_with()
def test_alignment_panel_emits_only_center_fit_actions(qtbot, mocked_client):
panel = create_widget(qtbot, WaveformAlignmentPanel, client=mocked_client)
fit_center_callback = MagicMock()
panel.fit_center_requested.connect(fit_center_callback)
panel.fit_dialog.move_action.emit(("sigma", 0.5))
fit_center_callback.assert_not_called()
panel.fit_dialog.move_action.emit(("center", 2.5))
fit_center_callback.assert_called_once_with(2.5)

View File

@@ -36,6 +36,22 @@ from .conftest import create_widget
##################################################
def make_alignment_fit_summary(center: float | None = None) -> dict:
params = []
if center is not None:
params.append(["center", center, True, None, -np.inf, np.inf, None, 0.1, {}, 0.0, None])
params.append(["sigma", 0.5, True, None, 0.0, np.inf, None, 0.1, {}, 1.0, None])
return {
"model": "Model(test)",
"method": "leastsq",
"chisqr": 1.0,
"redchi": 1.0,
"rsquared": 0.99,
"message": "Fit succeeded.",
"params": params,
}
def test_waveform_initialization(qtbot, mocked_client):
"""
Test that a new Waveform widget initializes with the correct defaults.
@@ -496,6 +512,218 @@ def test_add_dap_curve_custom_source(qtbot, mocked_client_with_dap):
assert dap_curve.config.signal.dap == "GaussianModel"
def test_alignment_mode_toggle_shows_bottom_panel(qtbot, mocked_client):
wf = create_widget(qtbot, Waveform, client=mocked_client)
action = wf.toolbar.components.get_action("alignment_mode").action
action.trigger()
assert wf._alignment_panel_visible is True
assert wf._alignment_side_panel.panel_visible is True
assert action.isChecked() is True
action.trigger()
assert wf._alignment_panel_visible is False
assert wf._alignment_side_panel.panel_visible is False
assert action.isChecked() is False
def test_resolve_alignment_positioner(qtbot, mocked_client):
wf = create_widget(qtbot, Waveform, client=mocked_client)
wf.x_mode = "samx"
assert wf._resolve_alignment_positioner() == "samx"
wf.x_mode = "auto"
wf._current_x_device = ("samx", "samx")
assert wf._resolve_alignment_positioner() == "samx"
wf._current_x_device = ("bpm4i", "bpm4i")
assert wf._resolve_alignment_positioner() is None
wf.x_mode = "index"
assert wf._resolve_alignment_positioner() is None
wf.x_mode = "timestamp"
assert wf._resolve_alignment_positioner() is None
def test_alignment_panel_updates_when_auto_x_motor_changes(
qtbot, mocked_client_with_dap, monkeypatch
):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
wf.plot(arg1="bpm4i", dap="GaussianModel")
wf.x_mode = "auto"
wf.toolbar.components.get_action("alignment_mode").action.trigger()
wf._current_x_device = ("samx", "samx")
wf._alignment_panel.set_positioner_device("samx")
wf.scan_item = create_dummy_scan_item()
wf.scan_item.metadata["bec"]["scan_report_devices"] = ["samy"]
data = {
"samy": {"samy": {"val": np.array([1.0, 2.0, 3.0])}},
"bpm4i": {"bpm4i": {"val": np.array([10.0, 20.0, 30.0])}},
}
monkeypatch.setattr(wf, "_fetch_scan_data_and_access", lambda: (data, "val"))
wf._get_x_data("bpm4i", "bpm4i")
assert wf._current_x_device == ("samy", "samy")
assert wf._alignment_positioner_name == "samy"
assert wf._alignment_panel.positioner.device == "samy"
def test_alignment_panel_disables_without_positioner(qtbot, mocked_client_with_dap):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
wf.plot(arg1="bpm4i")
wf.x_mode = "index"
wf.toolbar.components.get_action("alignment_mode").action.trigger()
assert wf._alignment_panel.positioner.isEnabled() is False
assert "positioner on the x axis" in wf._alignment_panel.status_label.text()
def test_alignment_marker_updates_from_positioner_readback(qtbot, mocked_client_with_dap):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
wf.plot(arg1="bpm4i", dap="GaussianModel")
wf.x_mode = "samx"
wf.toolbar.components.get_action("alignment_mode").action.trigger()
wf.dev["samx"].signals["samx"]["value"] = 4.2
wf._alignment_panel.positioner.force_update_readback()
assert wf._alignment_controller is not None
assert wf._alignment_controller.marker_line is not None
assert np.isclose(wf._alignment_controller.marker_line.value(), 4.2)
assert "samx" in wf._alignment_controller.marker_line.label.toPlainText()
assert "4.200" in wf._alignment_controller.marker_line.label.toPlainText()
def test_alignment_panel_uses_existing_dap_curves_and_moves_positioner(
qtbot, mocked_client_with_dap
):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
source_curve = wf.plot(arg1="bpm4i")
dap_curve = wf.add_dap_curve(device_label=source_curve.name(), dap_name="GaussianModel")
wf.x_mode = "samx"
wf.toolbar.components.get_action("alignment_mode").action.trigger()
fit_summary = make_alignment_fit_summary(center=2.5)
wf.dap_summary_update.emit(fit_summary, {"curve_id": dap_curve.name()})
wf._alignment_panel.fit_dialog.select_curve(dap_curve.name())
move_spy = MagicMock()
wf.dev["samx"].move = move_spy
assert wf._alignment_panel.fit_dialog.fit_curve_id == dap_curve.name()
assert wf._alignment_panel.fit_dialog.action_buttons["center"].isEnabled() is True
wf._alignment_panel.fit_dialog.action_buttons["center"].click()
move_spy.assert_called_once_with(2.5, relative=False)
def test_alignment_target_line_toggle_updates_target_value_label(qtbot, mocked_client_with_dap):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
wf.plot(arg1="bpm4i", dap="GaussianModel")
wf.x_mode = "samx"
wf.toolbar.components.get_action("alignment_mode").action.trigger()
wf._alignment_panel.target_toggle.setChecked(True)
assert wf._alignment_controller is not None
assert wf._alignment_controller.target_line is not None
assert wf._alignment_panel.move_to_target_button.isEnabled() is True
wf._alignment_controller.target_line.setValue(1.5)
assert "1.500" in wf._alignment_panel.target_toggle.text()
def test_alignment_move_to_target_uses_draggable_line_value(qtbot, mocked_client_with_dap):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
wf.plot(arg1="bpm4i", dap="GaussianModel")
wf.x_mode = "samx"
wf.toolbar.components.get_action("alignment_mode").action.trigger()
wf._alignment_panel.target_toggle.setChecked(True)
wf._alignment_controller.target_line.setValue(1.25)
move_spy = MagicMock()
wf.dev["samx"].move = move_spy
wf._alignment_panel.move_to_target_button.click()
move_spy.assert_called_once_with(1.25, relative=False)
def test_alignment_mode_toggle_off_keeps_user_dap_curve(qtbot, mocked_client_with_dap):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
source_curve = wf.plot(arg1="bpm4i")
dap_curve = wf.add_dap_curve(device_label=source_curve.name(), dap_name="GaussianModel")
wf.x_mode = "samx"
action = wf.toolbar.components.get_action("alignment_mode").action
action.trigger()
action.trigger()
assert wf.get_curve(dap_curve.name()) is not None
def test_alignment_mode_toggle_off_clears_controller_overlays(qtbot, mocked_client_with_dap):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
wf.plot(arg1="bpm4i", dap="GaussianModel")
wf.x_mode = "samx"
action = wf.toolbar.components.get_action("alignment_mode").action
action.trigger()
wf._alignment_panel.target_toggle.setChecked(True)
wf.dev["samx"].signals["samx"]["value"] = 2.0
wf._alignment_panel.positioner.force_update_readback()
assert wf._alignment_controller.marker_line is not None
assert wf._alignment_controller.target_line is not None
action.trigger()
assert wf._alignment_controller.marker_line is None
assert wf._alignment_controller.target_line is None
def test_alignment_panel_removes_deleted_dap_curve_from_fit_list(qtbot, mocked_client_with_dap):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
source_curve = wf.plot(arg1="bpm4i")
dap_curve = wf.add_dap_curve(device_label=source_curve.name(), dap_name="GaussianModel")
wf.toolbar.components.get_action("alignment_mode").action.trigger()
wf.dap_summary_update.emit(
make_alignment_fit_summary(center=1.5), {"curve_id": dap_curve.name()}
)
assert dap_curve.name() in wf._alignment_panel.fit_dialog.summary_data
wf.remove_curve(dap_curve.name())
assert dap_curve.name() not in wf._alignment_panel.fit_dialog.summary_data
def test_alignment_controller_move_request_moves_positioner(qtbot, mocked_client_with_dap):
wf = create_widget(qtbot, Waveform, client=mocked_client_with_dap)
wf.plot(arg1="bpm4i", dap="GaussianModel")
wf.x_mode = "samx"
move_spy = MagicMock()
wf.dev["samx"].move = move_spy
wf.toolbar.components.get_action("alignment_mode").action.trigger()
wf._alignment_controller.move_absolute_requested.emit(3.5)
move_spy.assert_called_once_with(3.5, relative=False)
def test_curve_set_data_emits_dap_update(qtbot, mocked_client):
wf = create_widget(qtbot, Waveform, client=mocked_client)
c = wf.plot(x=[1, 2, 3], y=[4, 5, 6], label="test_curve")