feat(waveform): new type of curve - history curve

This commit is contained in:
2025-06-03 18:02:26 +02:00
committed by Jan Wyzula
parent 4be70580a6
commit f083dff612
6 changed files with 399 additions and 76 deletions
+6
View File
@@ -5079,6 +5079,8 @@ class Waveform(RPCBase):
color: "str | None" = None,
label: "str | None" = None,
dap: "str | None" = None,
scan_id: "str | None" = None,
scan_number: "int | None" = None,
**kwargs,
) -> "Curve":
"""
@@ -5101,6 +5103,10 @@ class Waveform(RPCBase):
dap(str): The dap model to use for the curve, only available for sync devices.
If not specified, none will be added.
Use the same string as is the name of the LMFit model.
scan_id(str): Optional scan ID. When provided, the curve is treated as a **history** curve and
the ydata (and optional xdata) are fetched from that historical scan. Such curves are
never cleared by livescan resets.
scan_number(int): Optional scan index. When provided, the curve is treated as a **history** curve and
Returns:
Curve: The curve object.
@@ -55,7 +55,7 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
# "btn6": self.btn6,
# "pb": self.pb,
# "pi": self.pi,
# "wf": self.wf,
"wf": self.wf,
# "scatter": self.scatter,
# "scatter_mi": self.scatter,
# "mwf": self.mwf,
@@ -105,12 +105,11 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
# self.btn5 = QPushButton("Button 5")
# self.btn6 = QPushButton("Button 6")
#
# fifth_tab = QWidget()
# fifth_tab_layout = QVBoxLayout(fifth_tab)
# self.wf = Waveform()
# fifth_tab_layout.addWidget(self.wf)
# tab_widget.addTab(fifth_tab, "Waveform Next Gen")
# tab_widget.setCurrentIndex(4)
fifth_tab = QWidget()
fifth_tab_layout = QVBoxLayout(fifth_tab)
self.wf = Waveform()
fifth_tab_layout.addWidget(self.wf)
tab_widget.addTab(fifth_tab, "Waveform Next Gen")
#
sixth_tab = QWidget()
sixth_tab_layout = QVBoxLayout(sixth_tab)
+7 -2
View File
@@ -42,10 +42,15 @@ class CurveConfig(ConnectionConfig):
pen_style: Literal["solid", "dash", "dot", "dashdot"] | None = Field(
"solid", description="The style of the pen of the curve."
)
source: Literal["device", "dap", "custom"] = Field(
source: Literal["device", "dap", "custom", "history"] = Field(
"custom", description="The source of the curve."
)
signal: DeviceSignal | None = Field(None, description="The signal of the curve.")
scan_id: str | None = Field(None, description="Scan ID to be used when `source` is 'history'.")
scan_number: int | None = Field(
None, description="Scan index to be used when `source` is 'history'."
)
current_x_mode: str | None = Field(None, description="The current x mode of the history curve.")
parent_label: str | None = Field(
None, description="The label of the parent plot, only relevant for dap curves."
)
@@ -199,7 +204,7 @@ class Curve(BECConnector, pg.PlotDataItem):
Raises:
ValueError: If the source is not custom.
"""
if self.config.source == "custom":
if self.config.source in ["custom", "history"]:
self.setData(x, y)
else:
raise ValueError(f"Source {self.config.source} do not allow custom data setting.")
@@ -5,7 +5,31 @@ from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from bec_qthemes._icon.material_icons import material_icon
from qtpy.QtCore import Qt
from qtpy.QtGui import QValidator
class ScanIndexValidator(QValidator):
"""Validator to allow only 'live' or integer scan numbers within range."""
def __init__(self, max_scan: int, parent=None):
super().__init__(parent)
self.max_scan = max_scan
def validate(self, input_str: str, pos: int):
# Accept empty or 'live'
if input_str == "" or input_str == "live":
return QValidator.Acceptable, input_str, pos
# Allow partial editing of "live"
if "live".startswith(input_str):
return QValidator.Intermediate, input_str, pos
# Accept integer within [1, max_scan]
if input_str.isdigit():
num = int(input_str)
if 1 <= num <= self.max_scan:
return QValidator.Acceptable, input_str, pos
return QValidator.Invalid, input_str, pos
from qtpy.QtWidgets import (
QComboBox,
QHBoxLayout,
@@ -91,8 +115,32 @@ class CurveRow(QTreeWidgetItem):
# Create columns 1..2, depending on source
self._init_source_ui()
# Create columns 3..6 (color, style, width, symbol)
self._init_scan_index_ui()
self._init_style_controls()
def _init_scan_index_ui(self):
"""Create the Scan # editable combobox in column 3."""
if self.source not in ("device", "history"):
return
self.scan_index_combo = QComboBox()
self.scan_index_combo.setEditable(True)
# Populate 'live' and all available history scan indices
self.scan_index_combo.addItem("live")
history = getattr(self.curve_tree.client, "history", None)
num_scans = len(history) if history is not None else 0
# Restrict input to 'live' or valid scan numbers
validator = ScanIndexValidator(num_scans, self.scan_index_combo)
self.scan_index_combo.lineEdit().setValidator(validator)
for idx in range(num_scans):
# Display scan numbers starting at 1
self.scan_index_combo.addItem(str(idx + 1))
# Select current scan number if set, otherwise default to 'live'
if getattr(self.config, "scan_number", None) is not None:
self.scan_index_combo.setCurrentText(str(self.config.scan_number))
else:
self.scan_index_combo.setCurrentText("live")
self.tree.setItemWidget(self, 3, self.scan_index_combo)
def _init_actions(self):
"""Create the actions widget in column 0, including a delete button and maybe 'Add DAP'."""
self.actions_widget = QWidget()
@@ -114,7 +162,7 @@ class CurveRow(QTreeWidgetItem):
actions_layout.addWidget(self.delete_button)
# If device row, add "Add DAP" button
if self.source == "device":
if self.source in ("device", "history"):
self.add_dap_button = QPushButton("DAP")
self.add_dap_button.clicked.connect(lambda: self.add_dap_row())
actions_layout.addWidget(self.add_dap_button)
@@ -123,7 +171,7 @@ class CurveRow(QTreeWidgetItem):
def _init_source_ui(self):
"""Create columns 1 and 2. For device rows, we have device/entry edits; for dap rows, label/model combo."""
if self.source == "device":
if self.source in ("device", "history"):
# Device row: columns 1..2 are device line edits
self.device_edit = DeviceComboBox(parent=self.tree)
self.device_edit.insertItem(0, "")
@@ -152,7 +200,6 @@ class CurveRow(QTreeWidgetItem):
self.tree.setItemWidget(self, 1, self.device_edit)
self.tree.setItemWidget(self, 2, self.entry_edit)
else:
# DAP row: column1= "Model" label, column2= DapComboBox
self.label_widget = QLabel("Model")
@@ -171,31 +218,31 @@ class CurveRow(QTreeWidgetItem):
self.tree.setItemWidget(self, 2, self.dap_combo)
def _init_style_controls(self):
"""Create columns 3..6: color button, style combo, width spin, symbol spin."""
# Color in col 3
"""Create columns 4..7: color button, style combo, width spin, symbol spin."""
# Color in col 4
self.color_button = ColorButtonNative(color=self.config.color)
self.color_button.color_changed.connect(self._on_color_changed)
self.tree.setItemWidget(self, 3, self.color_button)
self.tree.setItemWidget(self, 4, self.color_button)
# Style in col 4
# Style in col 5
self.style_combo = QComboBox()
self.style_combo.addItems(["solid", "dash", "dot", "dashdot"])
idx = self.style_combo.findText(self.config.pen_style)
if idx >= 0:
self.style_combo.setCurrentIndex(idx)
self.tree.setItemWidget(self, 4, self.style_combo)
self.tree.setItemWidget(self, 5, self.style_combo)
# Pen width in col 5
# Pen width in col 6
self.width_spin = QSpinBox()
self.width_spin.setRange(1, 20)
self.width_spin.setValue(self.config.pen_width)
self.tree.setItemWidget(self, 5, self.width_spin)
self.tree.setItemWidget(self, 6, self.width_spin)
# Symbol size in col 6
# Symbol size in col 7
self.symbol_spin = QSpinBox()
self.symbol_spin.setRange(1, 20)
self.symbol_spin.setValue(self.config.symbol_size)
self.tree.setItemWidget(self, 6, self.symbol_spin)
self.tree.setItemWidget(self, 7, self.symbol_spin)
@SafeSlot(str, verify_sender=True)
def _on_color_changed(self, new_color: str):
@@ -209,8 +256,8 @@ class CurveRow(QTreeWidgetItem):
self.config.symbol_color = new_color
def add_dap_row(self):
"""Create a new DAP row as a child. Only valid if source='device'."""
if self.source != "device":
"""Create a new DAP row as a child. Only valid if source is 'device' or 'history'."""
if self.source not in ("device", "history"):
return
curve_tree = self.tree.parent()
parent_label = self.config.label
@@ -288,7 +335,7 @@ class CurveRow(QTreeWidgetItem):
Returns:
dict: The serialized config based on the GUI state.
"""
if self.source == "device":
if self.source in ("device", "history"):
# Gather device name/entry
device_name = ""
device_entry = ""
@@ -309,8 +356,23 @@ class CurveRow(QTreeWidgetItem):
)
self.config.signal = DeviceSignal(name=device_name, entry=device_entry)
self.config.source = "device"
self.config.label = f"{device_name}-{device_entry}"
scan_combo_text = self.scan_index_combo.currentText()
if scan_combo_text == "live" or scan_combo_text == "":
self.config.scan_number = None
self.config.scan_id = None
self.config.source = "device"
self.config.label = f"{device_name}-{device_entry}"
if scan_combo_text.isdigit():
try:
scan_num = int(scan_combo_text)
except ValueError:
scan_num = None
self.config.scan_number = scan_num
self.config.scan_id = None # has to be reset to fetch by scan number, not by scan id, can cause leak of old scan ids
self.config.source = "history"
# Label history curves with scan number suffix
if scan_num is not None:
self.config.label = f"{device_name}-{device_entry}-scan-{scan_num}"
else:
# DAP logic
parent_conf_dict = {}
@@ -443,10 +505,12 @@ class CurveTree(BECWidget, QWidget):
self.toolbar.show_bundles(["curve_tree"])
def _init_tree(self):
"""Initialize the QTreeWidget with 7 columns and compact widths."""
"""Initialize the QTreeWidget with 8 columns and compact widths."""
self.tree = QTreeWidget()
self.tree.setColumnCount(7)
self.tree.setHeaderLabels(["Actions", "Name", "Entry", "Color", "Style", "Width", "Symbol"])
self.tree.setColumnCount(8)
self.tree.setHeaderLabels(
["Actions", "Name", "Entry", "Scan #", "Color", "Style", "Width", "Symbol"]
)
header = self.tree.header()
for idx in range(self.tree.columnCount()):
@@ -456,10 +520,10 @@ class CurveTree(BECWidget, QWidget):
header.setSectionResizeMode(idx, QHeaderView.Fixed)
header.setStretchLastSection(False)
self.tree.setColumnWidth(0, 90)
self.tree.setColumnWidth(3, 70)
self.tree.setColumnWidth(4, 80)
self.tree.setColumnWidth(5, 50)
self.tree.setColumnWidth(4, 70)
self.tree.setColumnWidth(5, 80)
self.tree.setColumnWidth(6, 50)
self.tree.setColumnWidth(7, 50)
self.layout.addWidget(self.tree)
@@ -583,9 +647,9 @@ class CurveTree(BECWidget, QWidget):
self.tree.clear()
self.all_items = []
device_curves = [c for c in self.waveform.curves if c.config.source == "device"]
top_curves = [c for c in self.waveform.curves if c.config.source in ("device", "history")]
dap_curves = [c for c in self.waveform.curves if c.config.source == "dap"]
for dev in device_curves:
for dev in top_curves:
dr = CurveRow(self.tree, parent_item=None, config=dev.config, device_manager=self.dev)
for dap in dap_curves:
if dap.config.parent_label == dev.config.label:
+289 -40
View File
@@ -8,6 +8,7 @@ import numpy as np
import pyqtgraph as pg
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.scan_data_container import ScanDataContainer
from pydantic import Field, ValidationError, field_validator
from qtpy.QtCore import Qt, QTimer, Signal
from qtpy.QtWidgets import (
@@ -23,7 +24,6 @@ from qtpy.QtWidgets import (
QWidget,
)
from bec_lib.scan_data_container import ScanDataContainer
from bec_widgets.utils import ConnectionConfig
from bec_widgets.utils.bec_signal_proxy import BECSignalProxy
from bec_widgets.utils.colors import Colors, set_theme
@@ -36,6 +36,9 @@ from bec_widgets.widgets.plots.plot_base import PlotBase
from bec_widgets.widgets.plots.waveform.curve import Curve, CurveConfig, DeviceSignal
from bec_widgets.widgets.plots.waveform.settings.curve_settings.curve_setting import CurveSetting
from bec_widgets.widgets.plots.waveform.utils.roi_manager import WaveformROIManager
from bec_widgets.widgets.services.scan_history_browser.scan_history_browser import (
ScanHistoryBrowser,
)
logger = bec_logger.logger
@@ -164,6 +167,7 @@ class Waveform(PlotBase):
# Curve data
self._sync_curves = []
self._async_curves = []
self._history_curves = []
self._slice_index = None
self._dap_curves = []
self._mode = None
@@ -180,12 +184,14 @@ class Waveform(PlotBase):
"readout_priority": None,
"label_suffix": "",
}
self._current_x_device: tuple[str, str] | None = None
# Specific GUI elements
self._init_roi_manager()
self.dap_summary = None
self.dap_summary_dialog = None
self._add_fit_parameters_popup()
self.scan_history_dialog = None
self._add_waveform_specific_popup()
self._enable_roi_toolbar_action(False) # default state where are no dap curves
self._init_curve_dialog()
self.curve_settings_dialog = None
@@ -253,7 +259,7 @@ class Waveform(PlotBase):
super().add_side_menus()
self._add_dap_summary_side_menu()
def _add_fit_parameters_popup(self):
def _add_waveform_specific_popup(self):
"""
Add popups to the Waveform widget.
"""
@@ -263,11 +269,24 @@ class Waveform(PlotBase):
icon_name="monitoring", tooltip="Open Fit Parameters", checkable=True, parent=self
),
)
self.toolbar.components.add_safe(
"scan_history",
MaterialIconAction(
icon_name="manage_search",
tooltip="Open Scan History browser",
checkable=True,
parent=self,
),
)
self.toolbar.get_bundle("axis_popup").add_action("fit_params")
self.toolbar.get_bundle("axis_popup").add_action("scan_history")
self.toolbar.components.get_action("fit_params").action.triggered.connect(
self.show_dap_summary_popup
)
self.toolbar.components.get_action("scan_history").action.triggered.connect(
self.show_scan_history_popup
)
@SafeSlot()
def _reset_view(self):
@@ -415,6 +434,45 @@ class Waveform(PlotBase):
self.toolbar.components.get_action("roi_linear").action.setChecked(False)
self._roi_manager.toggle_roi(False)
################################################################################
# Scan History browser popup
# TODO this is so far quick implementation just as popup, we should make scan history also standalone widget later
def show_scan_history_popup(self):
"""
Show the scan history popup.
"""
scan_history_action = self.toolbar.components.get_action("scan_history").action
if self.scan_history_dialog is None or not self.scan_history_dialog.isVisible():
self.scan_history_widget = ScanHistoryBrowser(parent=self)
self.scan_history_dialog = QDialog(modal=False)
self.scan_history_dialog.setWindowTitle(f"{self.object_name} - Scan History Browser")
self.scan_history_dialog.layout = QVBoxLayout(self.scan_history_dialog)
self.scan_history_dialog.layout.addWidget(self.scan_history_widget)
self.scan_history_widget.scan_history_device_viewer.request_history_plot.connect(
lambda scan_id, device_name, signal_name: self.plot(
y_name=device_name, y_entry=signal_name, scan_id=scan_id
)
)
self.scan_history_dialog.finished.connect(self._scan_history_closed)
self.scan_history_dialog.show()
self.scan_history_dialog.resize(780, 320)
scan_history_action.setChecked(True)
else:
# If already open, bring it to the front
self.scan_history_dialog.raise_()
self.scan_history_dialog.activateWindow()
scan_history_action.setChecked(True) # keep it toggle
def _scan_history_closed(self):
"""
Slot for when the scan history dialog is closed.
"""
self.scan_history_widget.close()
self.scan_history_widget.deleteLater()
self.scan_history_dialog.deleteLater()
self.scan_history_dialog = None
self.toolbar.components.get_action("scan_history").action.setChecked(False)
################################################################################
# Dap Summary
@@ -504,8 +562,11 @@ class Waveform(PlotBase):
self.x_axis_mode["name"] = value
if value not in ["timestamp", "index", "auto"]:
self.x_axis_mode["entry"] = self.entry_validator.validate_signal(value, None)
self._current_x_device = (value, self.x_axis_mode["entry"])
self._switch_x_axis_item(mode=value)
self._refresh_x_label_suffix()
self._current_x_device = None
self._refresh_history_curves()
self._update_curve_visibility()
self.async_signal_update.emit()
self.sync_signal_update.emit()
self.plot_item.enableAutoRange(x=True)
@@ -533,7 +594,8 @@ class Waveform(PlotBase):
return
self.x_axis_mode["entry"] = self.entry_validator.validate_signal(self.x_mode, value)
self._switch_x_axis_item(mode="device")
self._refresh_x_label_suffix()
self._refresh_history_curves()
self._update_curve_visibility()
self.async_signal_update.emit()
self.sync_signal_update.emit()
self.plot_item.enableAutoRange(x=True)
@@ -674,6 +736,8 @@ class Waveform(PlotBase):
color: str | None = None,
label: str | None = None,
dap: str | None = None,
scan_id: str | None = None,
scan_number: int | None = None,
**kwargs,
) -> Curve:
"""
@@ -696,6 +760,10 @@ class Waveform(PlotBase):
dap(str): The dap model to use for the curve, only available for sync devices.
If not specified, none will be added.
Use the same string as is the name of the LMFit model.
scan_id(str): Optional scan ID. When provided, the curve is treated as a **history** curve and
the ydata (and optional xdata) are fetched from that historical scan. Such curves are
never cleared by livescan resets.
scan_number(int): Optional scan index. When provided, the curve is treated as a **history** curve and
Returns:
Curve: The curve object.
@@ -765,6 +833,8 @@ class Waveform(PlotBase):
label=label,
color=color,
source=source,
scan_id=scan_id,
scan_number=scan_number,
**kwargs,
)
@@ -772,6 +842,9 @@ class Waveform(PlotBase):
if source == "device":
config.signal = DeviceSignal(name=y_name, entry=y_entry)
if scan_id is not None or scan_number is not None:
config.source = "history"
# CREATE THE CURVE
curve = self._add_curve(config=config, x_data=x_data, y_data=y_data)
@@ -810,7 +883,7 @@ class Waveform(PlotBase):
device_curve = self._find_curve_by_label(device_label)
if not device_curve:
raise ValueError(f"No existing curve found with label '{device_label}'.")
if device_curve.config.source != "device":
if device_curve.config.source not in ("device", "history"):
raise ValueError(
f"Curve '{device_label}' is not a device curve. Only device curves can have DAP."
)
@@ -819,7 +892,7 @@ class Waveform(PlotBase):
dev_entry = device_curve.config.signal.entry
# 2) Build a label for the new DAP curve
dap_label = f"{dev_name}-{dev_entry}-{dap_name}"
dap_label = f"{device_label}-{dap_name}"
# 3) Possibly raise if the DAP curve already exists
if self._check_curve_id(dap_label):
@@ -872,7 +945,19 @@ class Waveform(PlotBase):
ValueError: If a duplicate curve label/config is found, or if
custom data is missing for `source='custom'`.
"""
scan_item: ScanDataContainer | None = None
if config.source == "history":
# Convert 1-based scan_number (from user/API) to 0-based list index
list_index = config.scan_number - 1 if config.scan_number is not None else None
scan_item = self.get_history_scan_item(scan_id=config.scan_id, scan_index=list_index)
config.scan_id = scan_item.metadata["bec"]["scan_id"]
# Preserve the 1-based scan_number for external labeling
config.scan_number = scan_item.metadata["bec"]["scan_number"]
label = config.label
if config.source == "history":
label = f"{config.signal.name}-{config.signal.entry}-scan-{config.scan_number}"
config.label = label
if not label:
# Fallback label
label = WidgetContainerUtils.generate_unique_name(
@@ -894,7 +979,7 @@ class Waveform(PlotBase):
raise ValueError("For 'custom' curves, x_data and y_data must be provided.")
# Actually create the Curve item
curve = self._add_curve_object(name=label, config=config)
curve = self._add_curve_object(name=label, config=config, scan_item=scan_item)
# If custom => set initial data
if config.source == "custom" and x_data is not None and y_data is not None:
@@ -911,6 +996,8 @@ class Waveform(PlotBase):
self.setup_dap_for_scan()
self.roi_enable.emit(True) # Enable the ROI toolbar action
self.request_dap() # Request DAP update directly without blocking proxy
if config.source == "history":
self._history_curves.append(curve)
QTimer.singleShot(
150, self.auto_range
@@ -918,24 +1005,173 @@ class Waveform(PlotBase):
return curve
def _add_curve_object(self, name: str, config: CurveConfig) -> Curve:
def _add_curve_object(
self, name: str, config: CurveConfig, scan_item: ScanDataContainer | None = None
) -> Curve | None:
"""
Low-level creation of the PlotDataItem (Curve) from a `CurveConfig`.
Args:
name (str): The name/label of the curve.
config (CurveConfig): Configuration model describing the curve.
scan_item (ScanDataContainer | None): Optional scan item for history curves.
Returns:
Curve: The newly created curve object, added to the plot.
"""
curve = Curve(config=config, name=name, parent_item=self)
self.plot_item.addItem(curve)
if scan_item is not None:
self._fetch_history_data_for_curve(curve, scan_item)
self._categorise_device_curves()
curve.visibleChanged.connect(self._refresh_crosshair_markers)
curve.visibleChanged.connect(self.auto_range)
return curve
def _fetch_history_data_for_curve(
self, curve: Curve, scan_item: ScanDataContainer
) -> Curve | None:
# Check if the data are already set
device = curve.config.signal.name
entry = curve.config.signal.entry
all_devices_used = scan_item._msg.stored_data_info
if all_devices_used is None:
curve.remove()
raise ValueError(
f"No stored data info found in scan item ID:{curve.config.scan_id} for curve '{curve.name()}'. "
f"Upgrade BEC to the latest version."
)
# 1. get y data
x_data, y_data = None, None
if device not in all_devices_used:
raise ValueError(f"Device '{device}' not found in scan item ID:{curve.config.scan_id}.")
if entry not in all_devices_used[device]:
raise ValueError(
f"Entry '{entry}' not found in device '{device}' in scan item ID:{curve.config.scan_id}."
)
y_shape = all_devices_used.get(device).get(entry).shape[0]
# Determine X-axis data
if self.x_axis_mode["name"] == "index":
x_data = np.arange(y_shape)
curve.config.current_x_mode = "index"
self._update_x_label_suffix(" (index)")
elif self.x_axis_mode["name"] == "timestamp":
y_device = scan_item.devices.get(device)
x_data = y_device.get(entry).read().get("timestamp")
curve.config.current_x_mode = "timestamp"
self._update_x_label_suffix(" (timestamp)")
elif self.x_axis_mode["name"] not in ("index", "timestamp", "auto"): # Custom device mode
if self.x_axis_mode["name"] not in all_devices_used:
logger.warning(
f"Custom device '{self.x_axis_mode['name']}' not found in scan item of history curve '{curve.name()}'; scan ID: {curve.config.scan_id}."
)
curve.setVisible(False)
return
x_entry_custom = self.x_axis_mode.get("entry")
if x_entry_custom is None:
x_entry_custom = self.entry_validator.validate_signal(
self.x_axis_mode["name"], None
)
if x_entry_custom not in all_devices_used[self.x_axis_mode["name"]]:
logger.warning(
f"Custom entry '{x_entry_custom}' for device '{self.x_axis_mode['name']}' not found in scan item of history curve '{curve.name()}'; scan ID: {curve.config.scan_id}."
)
curve.setVisible(False)
return
x_shape = (
scan_item._msg.stored_data_info.get(self.x_axis_mode["name"])
.get(x_entry_custom)
.shape[0]
)
if x_shape != y_shape:
logger.warning(
f"Shape mismatch for x data '{x_shape}' and y data '{y_shape}' in history curve '{curve.name()}'; scan ID: {curve.config.scan_id}."
)
curve.setVisible(False)
return
x_device = scan_item.devices.get(self.x_axis_mode["name"])
x_data = x_device.get(x_entry_custom).read().get("value")
curve.config.current_x_mode = self.x_axis_mode["name"]
self._update_x_label_suffix(f" (custom: {self.x_axis_mode['name']}-{x_entry_custom})")
elif self.x_axis_mode["name"] == "auto":
if (
self._current_x_device is None
): # Scenario where no x device is set yet, because there was no live scan done in this widget yet
# If no current x device, use the first motor from scan item
scan_motors = self._ensure_str_list(
scan_item.metadata.get("bec").get("scan_report_devices")
)
if not scan_motors: # scan was done without reported motor from whatever reason
x_data = np.arange(y_shape) # Fallback to index
y_data = scan_item.devices.get(device).get(entry).read().get("value")
curve.set_data(x=x_data, y=y_data)
self._update_x_label_suffix(" (auto: index)")
return curve
x_entry = self.entry_validator.validate_signal(scan_motors[0], None)
if x_entry not in all_devices_used.get(scan_motors[0], {}):
logger.warning(
f"Auto x entry '{x_entry}' for device '{scan_motors[0]}' not found in scan item of history curve '{curve.name()}'; scan ID: {curve.config.scan_id}."
)
curve.setVisible(False)
return
if y_shape != all_devices_used.get(scan_motors[0]).get(x_entry, {}).shape[0]:
logger.warning(
f"Shape mismatch for x data '{all_devices_used.get(scan_motors[0]).get(x_entry, {}).get('shape', [0])[0]}' and y data '{y_shape}' in history curve '{curve.name()}'; scan ID: {curve.config.scan_id}."
)
curve.setVisible(False)
return
x_data = scan_item.devices.get(scan_motors[0]).get(x_entry).read().get("value")
self._current_x_device = (scan_motors[0], x_entry)
self._update_x_label_suffix(f" (auto: {scan_motors[0]}-{x_entry})")
curve.config.current_x_mode = "auto"
self._update_x_label_suffix(f" (auto: {scan_motors[0]}-{x_entry})")
else: # Scan in auto mode was done and live scan already set the current x device
if self._current_x_device[0] not in all_devices_used:
logger.warning(
f"Auto x data for device '{self._current_x_device[0]}' "
f"and entry '{self._current_x_device[1]}'"
f" not found in scan item of the history curve {curve.name()}."
)
curve.setVisible(False)
return
x_device = scan_item.devices.get(self._current_x_device[0])
x_data = x_device.get(self._current_x_device[1]).read().get("value")
curve.config.current_x_mode = "auto"
self._update_x_label_suffix(
f" (auto: {self._current_x_device[0]}-{self._current_x_device[1]})"
)
if x_data is None:
logger.warning(
f"X data for curve '{curve.name()}' could not be determined. "
f"Check if the x_mode '{self.x_axis_mode['name']}' is valid for the scan item."
)
curve.setVisible(False)
return
if y_data is None:
y_data = scan_item.devices.get(device).get(entry).read().get("value")
if y_data is None:
logger.warning(
f"Y data for curve '{curve.name()}' could not be determined. "
f"Check if the device '{device}' and entry '{entry}' are valid for the scan item."
)
curve.setVisible(False)
return
curve.set_data(x=x_data, y=y_data)
return curve
def _refresh_history_curves(self):
for curve in self._history_curves:
scan_item = self.get_history_scan_item(
scan_id=curve.config.scan_id, scan_index=curve.config.scan_number
)
if scan_item is not None:
self._fetch_history_data_for_curve(curve, scan_item)
else:
logger.warning(f"Scan item for curve {curve.name()} not found.")
def _refresh_crosshair_markers(self):
"""
Refresh the crosshair markers when a curve visibility changes.
@@ -970,7 +1206,42 @@ class Waveform(PlotBase):
Clear all data from the plot widget, but keep the curve references.
"""
for c in self.curves:
c.clear_data()
if c.config.source != "history":
c.clear_data()
# X-axis compatibility helpers
def _is_curve_compatible(self, curve: Curve) -> bool:
"""
Return True when *curve* can be shown with the current x-axis mode.
- index, timestamp are always compatible.
- For history curves we check whether the requested motor
(self.x_axis_mode["name"]) exists in the cached
history_data_buffer["x"] dictionary.
- DAP is done by checking if the parent curve is visible.
- Device curves are fetched by update sync/async curves, which solves the compatibility there.
"""
mode = self.x_axis_mode.get("name", "index")
if mode in ("index", "timestamp"): # always compatible - wild west mode
return True
if curve.config.source == "history":
scan_item = self.get_history_scan_item(
scan_id=curve.config.scan_id, scan_index=curve.config.scan_number
)
curve = self._fetch_history_data_for_curve(curve, scan_item)
if curve is None:
return False
if curve.config.source == "dap":
parent_curve = self._find_curve_by_label(curve.config.parent_label)
if parent_curve.isVisible():
return True
return False # DAP curve is not compatible if parent curve is not visible
return True
def _update_curve_visibility(self) -> None:
"""Show or hide curves according to `_is_curve_compatible`."""
for c in self.curves:
c.setVisible(self._is_curve_compatible(c))
def clear_all(self):
"""
@@ -1133,7 +1404,7 @@ class Waveform(PlotBase):
self.scan_id = current_scan_id
self.scan_item = self.queue.scan_storage.find_scan_by_ID(self.scan_id) # live scan
self._slice_index = None # Reset the slice index
self._update_curve_visibility()
self._mode = self._categorise_device_curves()
# First trigger to sync and async data
@@ -1619,6 +1890,7 @@ class Waveform(PlotBase):
entry_obj = data.get(x_name, {}).get(x_entry)
x_data = entry_obj.read()["value"] if entry_obj else [0]
new_suffix = f" (custom: {x_name}-{x_entry})"
self._current_x_device = (x_name, x_entry)
# 2 User wants timestamp
if self.x_axis_mode["name"] == "timestamp":
@@ -1633,11 +1905,13 @@ class Waveform(PlotBase):
timestamps = entry_obj.read()["timestamp"] if entry_obj else [0]
x_data = timestamps
new_suffix = " (timestamp)"
self._current_x_device = None
# 3 User wants index
if self.x_axis_mode["name"] == "index":
x_data = None
new_suffix = " (index)"
self._current_x_device = None
# 4 Best effort automatic mode
if self.x_axis_mode["name"] is None or self.x_axis_mode["name"] == "auto":
@@ -1645,6 +1919,7 @@ class Waveform(PlotBase):
if len(self._async_curves) > 0:
x_data = None
new_suffix = " (auto: index)"
self._current_x_device = None
# 4.2 If there are sync curves, use the first device from the scan report
else:
try:
@@ -1667,6 +1942,7 @@ class Waveform(PlotBase):
entry_obj = data.get(x_name, {}).get(x_entry)
x_data = entry_obj.read()["value"] if entry_obj else None
new_suffix = f" (auto: {x_name}-{x_entry})"
self._current_x_device = (x_name, x_entry)
self._update_x_label_suffix(new_suffix)
return x_data
@@ -1683,30 +1959,6 @@ class Waveform(PlotBase):
self.x_axis_mode["label_suffix"] = new_suffix
self.set_x_label_suffix(new_suffix)
def _refresh_x_label_suffix(self):
"""
Update the xaxis label suffix immediately based on the current
``x_axis_mode`` dictionary, without waiting for the next data
update cycle.
"""
mode = self.x_axis_mode.get("name", "auto")
if mode == "timestamp":
suffix = " (timestamp)"
elif mode == "index":
suffix = " (index)"
elif mode == "auto":
suffix = " (auto)"
else:
x_name = mode
x_entry = self.x_axis_mode.get("entry", None)
if x_entry:
suffix = f" (custom: {x_name}-{x_entry})"
else:
suffix = f" (custom: {x_name})"
self._update_x_label_suffix(suffix)
def _switch_x_axis_item(self, mode: str):
"""
Switch the x-axis mode between timestamp, index, the best effort and custom signal.
@@ -1808,7 +2060,7 @@ class Waveform(PlotBase):
ScanDataContainer | None: The fetched scan item or None if no item was found.
"""
if scan_index is not None and scan_id is not None:
raise ValueError("Only one of scan_id or scan_index can be provided.")
scan_index = None # Only one of scan_id or scan_index can be provided -> scan id is more reliable
if scan_index is None and scan_id is None:
logger.warning("Neither scan_id or scan_number was provided, fetching the latest scan")
@@ -1843,9 +2095,6 @@ class Waveform(PlotBase):
scan_id(str, optional): ScanID of the scan to be updated. Defaults to None.
scan_index(int, optional): Index of the scan to be updated. Defaults to None.
"""
if scan_index is not None and scan_id is not None:
raise ValueError("Only one of scan_id or scan_index can be provided.")
self.scan_item = self.get_history_scan_item(scan_index=scan_index, scan_id=scan_id)
if self.scan_item is None:
@@ -1854,7 +2103,7 @@ class Waveform(PlotBase):
if scan_id is not None:
self.scan_id = scan_id
else:
# If scan_index was used, set the scan_id from the fetched item
# If scan_number was used, set the scan_id from the fetched item
if hasattr(self.scan_item, "metadata"):
self.scan_id = self.scan_item.metadata["bec"]["scan_id"]
else:
+1 -1
View File
@@ -155,7 +155,7 @@ def test_curve_tree_init(curve_tree_fixture):
curve_tree, wf = curve_tree_fixture
assert curve_tree.waveform == wf
assert curve_tree.color_palette == "plasma"
assert curve_tree.tree.columnCount() == 7
assert curve_tree.tree.columnCount() == 8
assert curve_tree.toolbar.components.exists("add")
assert curve_tree.toolbar.components.exists("expand")