mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-13 11:11:49 +02:00
feat(waveform): new type of curve - history curve
This commit is contained in:
@ -4726,6 +4726,8 @@ class Waveform(RPCBase):
|
||||
color: "str | None" = None,
|
||||
label: "str | None" = None,
|
||||
dap: "str | None" = None,
|
||||
scan_id: "str | None" = None,
|
||||
scan_number: "int | None" = None,
|
||||
**kwargs,
|
||||
) -> "Curve":
|
||||
"""
|
||||
@ -4748,6 +4750,10 @@ class Waveform(RPCBase):
|
||||
dap(str): The dap model to use for the curve, only available for sync devices.
|
||||
If not specified, none will be added.
|
||||
Use the same string as is the name of the LMFit model.
|
||||
scan_id(str): Optional scan ID. When provided, the curve is treated as a **history** curve and
|
||||
the y‑data (and optional x‑data) are fetched from that historical scan. Such curves are
|
||||
never cleared by live‑scan resets.
|
||||
scan_number(int): Optional scan index. When provided, the curve is treated as a **history** curve and
|
||||
|
||||
Returns:
|
||||
Curve: The curve object.
|
||||
|
@ -55,7 +55,7 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
|
||||
# "btn6": self.btn6,
|
||||
# "pb": self.pb,
|
||||
# "pi": self.pi,
|
||||
# "wf": self.wf,
|
||||
"wf": self.wf,
|
||||
# "scatter": self.scatter,
|
||||
# "scatter_mi": self.scatter,
|
||||
# "mwf": self.mwf,
|
||||
@ -105,12 +105,11 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
|
||||
# self.btn5 = QPushButton("Button 5")
|
||||
# self.btn6 = QPushButton("Button 6")
|
||||
#
|
||||
# fifth_tab = QWidget()
|
||||
# fifth_tab_layout = QVBoxLayout(fifth_tab)
|
||||
# self.wf = Waveform()
|
||||
# fifth_tab_layout.addWidget(self.wf)
|
||||
# tab_widget.addTab(fifth_tab, "Waveform Next Gen")
|
||||
# tab_widget.setCurrentIndex(4)
|
||||
fifth_tab = QWidget()
|
||||
fifth_tab_layout = QVBoxLayout(fifth_tab)
|
||||
self.wf = Waveform()
|
||||
fifth_tab_layout.addWidget(self.wf)
|
||||
tab_widget.addTab(fifth_tab, "Waveform Next Gen")
|
||||
#
|
||||
sixth_tab = QWidget()
|
||||
sixth_tab_layout = QVBoxLayout(sixth_tab)
|
||||
|
@ -42,10 +42,16 @@ class CurveConfig(ConnectionConfig):
|
||||
pen_style: Literal["solid", "dash", "dot", "dashdot"] | None = Field(
|
||||
"solid", description="The style of the pen of the curve."
|
||||
)
|
||||
source: Literal["device", "dap", "custom"] = Field(
|
||||
source: Literal["device", "dap", "custom", "history"] = Field(
|
||||
"custom", description="The source of the curve."
|
||||
)
|
||||
signal: DeviceSignal | None = Field(None, description="The signal of the curve.")
|
||||
scan_id: str | None = Field(None, description="Scan ID to be used when `source` is 'history'.")
|
||||
scan_number: int | None = Field(
|
||||
None, description="Scan index to be used when `source` is 'history'."
|
||||
)
|
||||
# TODO have to decide if make sense
|
||||
current_x_mode: str | None = Field(None, description="The current x mode of the history curve.")
|
||||
parent_label: str | None = Field(
|
||||
None, description="The label of the parent plot, only relevant for dap curves."
|
||||
)
|
||||
@ -199,7 +205,7 @@ class Curve(BECConnector, pg.PlotDataItem):
|
||||
Raises:
|
||||
ValueError: If the source is not custom.
|
||||
"""
|
||||
if self.config.source == "custom":
|
||||
if self.config.source in ["custom", "history"]:
|
||||
self.setData(x, y)
|
||||
else:
|
||||
raise ValueError(f"Source {self.config.source} do not allow custom data setting.")
|
||||
|
@ -5,7 +5,31 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_qthemes._icon.material_icons import material_icon
|
||||
from qtpy.QtCore import Qt
|
||||
from qtpy.QtGui import QValidator
|
||||
|
||||
|
||||
class ScanIndexValidator(QValidator):
|
||||
"""Validator to allow only 'live' or integer scan numbers within range."""
|
||||
|
||||
def __init__(self, max_scan: int, parent=None):
|
||||
super().__init__(parent)
|
||||
self.max_scan = max_scan
|
||||
|
||||
def validate(self, input_str: str, pos: int):
|
||||
# Accept empty or 'live'
|
||||
if input_str == "" or input_str == "live":
|
||||
return QValidator.Acceptable, input_str, pos
|
||||
# Allow partial editing of "live"
|
||||
if "live".startswith(input_str):
|
||||
return QValidator.Intermediate, input_str, pos
|
||||
# Accept integer within [1, max_scan]
|
||||
if input_str.isdigit():
|
||||
num = int(input_str)
|
||||
if 1 <= num <= self.max_scan:
|
||||
return QValidator.Acceptable, input_str, pos
|
||||
return QValidator.Invalid, input_str, pos
|
||||
|
||||
|
||||
from qtpy.QtWidgets import (
|
||||
QComboBox,
|
||||
QHBoxLayout,
|
||||
@ -91,8 +115,32 @@ class CurveRow(QTreeWidgetItem):
|
||||
# Create columns 1..2, depending on source
|
||||
self._init_source_ui()
|
||||
# Create columns 3..6 (color, style, width, symbol)
|
||||
self._init_scan_index_ui()
|
||||
self._init_style_controls()
|
||||
|
||||
def _init_scan_index_ui(self):
|
||||
"""Create the Scan # editable combobox in column 3."""
|
||||
if self.source not in ("device", "history"):
|
||||
return
|
||||
self.scan_index_combo = QComboBox()
|
||||
self.scan_index_combo.setEditable(True)
|
||||
# Populate 'live' and all available history scan indices
|
||||
self.scan_index_combo.addItem("live")
|
||||
history = getattr(self.curve_tree.client, "history", None)
|
||||
num_scans = len(history) if history is not None else 0
|
||||
# Restrict input to 'live' or valid scan numbers
|
||||
validator = ScanIndexValidator(num_scans, self.scan_index_combo)
|
||||
self.scan_index_combo.lineEdit().setValidator(validator)
|
||||
for idx in range(num_scans):
|
||||
# Display scan numbers starting at 1
|
||||
self.scan_index_combo.addItem(str(idx + 1))
|
||||
# Select current scan number if set, otherwise default to 'live'
|
||||
if getattr(self.config, "scan_number", None) is not None:
|
||||
self.scan_index_combo.setCurrentText(str(self.config.scan_number))
|
||||
else:
|
||||
self.scan_index_combo.setCurrentText("live")
|
||||
self.tree.setItemWidget(self, 3, self.scan_index_combo)
|
||||
|
||||
def _init_actions(self):
|
||||
"""Create the actions widget in column 0, including a delete button and maybe 'Add DAP'."""
|
||||
self.actions_widget = QWidget()
|
||||
@ -114,7 +162,7 @@ class CurveRow(QTreeWidgetItem):
|
||||
actions_layout.addWidget(self.delete_button)
|
||||
|
||||
# If device row, add "Add DAP" button
|
||||
if self.source == "device":
|
||||
if self.source in ("device", "history"):
|
||||
self.add_dap_button = QPushButton("DAP")
|
||||
self.add_dap_button.clicked.connect(lambda: self.add_dap_row())
|
||||
actions_layout.addWidget(self.add_dap_button)
|
||||
@ -123,7 +171,7 @@ class CurveRow(QTreeWidgetItem):
|
||||
|
||||
def _init_source_ui(self):
|
||||
"""Create columns 1 and 2. For device rows, we have device/entry edits; for dap rows, label/model combo."""
|
||||
if self.source == "device":
|
||||
if self.source in ("device", "history"):
|
||||
# Device row: columns 1..2 are device line edits
|
||||
self.device_edit = DeviceComboBox(parent=self.tree)
|
||||
self.device_edit.insertItem(0, "")
|
||||
@ -152,7 +200,6 @@ class CurveRow(QTreeWidgetItem):
|
||||
|
||||
self.tree.setItemWidget(self, 1, self.device_edit)
|
||||
self.tree.setItemWidget(self, 2, self.entry_edit)
|
||||
|
||||
else:
|
||||
# DAP row: column1= "Model" label, column2= DapComboBox
|
||||
self.label_widget = QLabel("Model")
|
||||
@ -171,31 +218,31 @@ class CurveRow(QTreeWidgetItem):
|
||||
self.tree.setItemWidget(self, 2, self.dap_combo)
|
||||
|
||||
def _init_style_controls(self):
|
||||
"""Create columns 3..6: color button, style combo, width spin, symbol spin."""
|
||||
# Color in col 3
|
||||
"""Create columns 4..7: color button, style combo, width spin, symbol spin."""
|
||||
# Color in col 4
|
||||
self.color_button = ColorButtonNative(color=self.config.color)
|
||||
self.color_button.color_changed.connect(self._on_color_changed)
|
||||
self.tree.setItemWidget(self, 3, self.color_button)
|
||||
self.tree.setItemWidget(self, 4, self.color_button)
|
||||
|
||||
# Style in col 4
|
||||
# Style in col 5
|
||||
self.style_combo = QComboBox()
|
||||
self.style_combo.addItems(["solid", "dash", "dot", "dashdot"])
|
||||
idx = self.style_combo.findText(self.config.pen_style)
|
||||
if idx >= 0:
|
||||
self.style_combo.setCurrentIndex(idx)
|
||||
self.tree.setItemWidget(self, 4, self.style_combo)
|
||||
self.tree.setItemWidget(self, 5, self.style_combo)
|
||||
|
||||
# Pen width in col 5
|
||||
# Pen width in col 6
|
||||
self.width_spin = QSpinBox()
|
||||
self.width_spin.setRange(1, 20)
|
||||
self.width_spin.setValue(self.config.pen_width)
|
||||
self.tree.setItemWidget(self, 5, self.width_spin)
|
||||
self.tree.setItemWidget(self, 6, self.width_spin)
|
||||
|
||||
# Symbol size in col 6
|
||||
# Symbol size in col 7
|
||||
self.symbol_spin = QSpinBox()
|
||||
self.symbol_spin.setRange(1, 20)
|
||||
self.symbol_spin.setValue(self.config.symbol_size)
|
||||
self.tree.setItemWidget(self, 6, self.symbol_spin)
|
||||
self.tree.setItemWidget(self, 7, self.symbol_spin)
|
||||
|
||||
@SafeSlot(str, verify_sender=True)
|
||||
def _on_color_changed(self, new_color: str):
|
||||
@ -209,8 +256,8 @@ class CurveRow(QTreeWidgetItem):
|
||||
self.config.symbol_color = new_color
|
||||
|
||||
def add_dap_row(self):
|
||||
"""Create a new DAP row as a child. Only valid if source='device'."""
|
||||
if self.source != "device":
|
||||
"""Create a new DAP row as a child. Only valid if source is 'device' or 'history'."""
|
||||
if self.source not in ("device", "history"):
|
||||
return
|
||||
curve_tree = self.tree.parent()
|
||||
parent_label = self.config.label
|
||||
@ -288,7 +335,7 @@ class CurveRow(QTreeWidgetItem):
|
||||
Returns:
|
||||
dict: The serialized config based on the GUI state.
|
||||
"""
|
||||
if self.source == "device":
|
||||
if self.source in ("device", "history"):
|
||||
# Gather device name/entry
|
||||
device_name = ""
|
||||
device_entry = ""
|
||||
@ -309,8 +356,22 @@ class CurveRow(QTreeWidgetItem):
|
||||
)
|
||||
|
||||
self.config.signal = DeviceSignal(name=device_name, entry=device_entry)
|
||||
self.config.source = "device"
|
||||
self.config.label = f"{device_name}-{device_entry}"
|
||||
scan_combo_text = self.scan_index_combo.currentText()
|
||||
if scan_combo_text == "live" or scan_combo_text == "":
|
||||
self.config.scan_number = None
|
||||
self.config.scan_id = None
|
||||
self.config.source = "device"
|
||||
self.config.label = f"{device_name}-{device_entry}"
|
||||
if scan_combo_text.isdigit():
|
||||
try:
|
||||
scan_num = int(scan_combo_text)
|
||||
except ValueError:
|
||||
scan_num = None
|
||||
self.config.scan_number = scan_num
|
||||
self.config.source = "history"
|
||||
# Label history curves with scan number suffix
|
||||
if scan_num is not None:
|
||||
self.config.label = f"{device_name}-{device_entry}-scan-{scan_num}"
|
||||
else:
|
||||
# DAP logic
|
||||
parent_conf_dict = {}
|
||||
@ -443,10 +504,12 @@ class CurveTree(BECWidget, QWidget):
|
||||
self.toolbar.show_bundles(["curve_tree"])
|
||||
|
||||
def _init_tree(self):
|
||||
"""Initialize the QTreeWidget with 7 columns and compact widths."""
|
||||
"""Initialize the QTreeWidget with 8 columns and compact widths."""
|
||||
self.tree = QTreeWidget()
|
||||
self.tree.setColumnCount(7)
|
||||
self.tree.setHeaderLabels(["Actions", "Name", "Entry", "Color", "Style", "Width", "Symbol"])
|
||||
self.tree.setColumnCount(8)
|
||||
self.tree.setHeaderLabels(
|
||||
["Actions", "Name", "Entry", "Scan #", "Color", "Style", "Width", "Symbol"]
|
||||
)
|
||||
|
||||
header = self.tree.header()
|
||||
for idx in range(self.tree.columnCount()):
|
||||
@ -456,10 +519,10 @@ class CurveTree(BECWidget, QWidget):
|
||||
header.setSectionResizeMode(idx, QHeaderView.Fixed)
|
||||
header.setStretchLastSection(False)
|
||||
self.tree.setColumnWidth(0, 90)
|
||||
self.tree.setColumnWidth(3, 70)
|
||||
self.tree.setColumnWidth(4, 80)
|
||||
self.tree.setColumnWidth(5, 50)
|
||||
self.tree.setColumnWidth(4, 70)
|
||||
self.tree.setColumnWidth(5, 80)
|
||||
self.tree.setColumnWidth(6, 50)
|
||||
self.tree.setColumnWidth(7, 50)
|
||||
|
||||
self.layout.addWidget(self.tree)
|
||||
|
||||
@ -583,9 +646,9 @@ class CurveTree(BECWidget, QWidget):
|
||||
self.tree.clear()
|
||||
self.all_items = []
|
||||
|
||||
device_curves = [c for c in self.waveform.curves if c.config.source == "device"]
|
||||
top_curves = [c for c in self.waveform.curves if c.config.source in ("device", "history")]
|
||||
dap_curves = [c for c in self.waveform.curves if c.config.source == "dap"]
|
||||
for dev in device_curves:
|
||||
for dev in top_curves:
|
||||
dr = CurveRow(self.tree, parent_item=None, config=dev.config, device_manager=self.dev)
|
||||
for dap in dap_curves:
|
||||
if dap.config.parent_label == dev.config.label:
|
||||
|
@ -8,6 +8,7 @@ import numpy as np
|
||||
import pyqtgraph as pg
|
||||
from bec_lib import bec_logger, messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.scan_data_container import ScanDataContainer
|
||||
from pydantic import Field, ValidationError, field_validator
|
||||
from qtpy.QtCore import Qt, QTimer, Signal
|
||||
from qtpy.QtWidgets import (
|
||||
@ -23,7 +24,6 @@ from qtpy.QtWidgets import (
|
||||
QWidget,
|
||||
)
|
||||
|
||||
from bec_lib.scan_data_container import ScanDataContainer
|
||||
from bec_widgets.utils import ConnectionConfig
|
||||
from bec_widgets.utils.bec_signal_proxy import BECSignalProxy
|
||||
from bec_widgets.utils.colors import Colors, set_theme
|
||||
@ -163,6 +163,7 @@ class Waveform(PlotBase):
|
||||
# Curve data
|
||||
self._sync_curves = []
|
||||
self._async_curves = []
|
||||
self._history_curves = []
|
||||
self._slice_index = None
|
||||
self._dap_curves = []
|
||||
self._mode = None
|
||||
@ -179,6 +180,7 @@ class Waveform(PlotBase):
|
||||
"readout_priority": None,
|
||||
"label_suffix": "",
|
||||
}
|
||||
self._current_x_device: tuple[str, str] | None = None
|
||||
|
||||
# Specific GUI elements
|
||||
self._init_roi_manager()
|
||||
@ -503,8 +505,11 @@ class Waveform(PlotBase):
|
||||
self.x_axis_mode["name"] = value
|
||||
if value not in ["timestamp", "index", "auto"]:
|
||||
self.x_axis_mode["entry"] = self.entry_validator.validate_signal(value, None)
|
||||
self._current_x_device = (value, self.x_axis_mode["entry"])
|
||||
self._switch_x_axis_item(mode=value)
|
||||
self._refresh_x_label_suffix()
|
||||
self._current_x_device = None
|
||||
self._refresh_history_curves()
|
||||
self._update_curve_visibility()
|
||||
self.async_signal_update.emit()
|
||||
self.sync_signal_update.emit()
|
||||
self.plot_item.enableAutoRange(x=True)
|
||||
@ -532,7 +537,8 @@ class Waveform(PlotBase):
|
||||
return
|
||||
self.x_axis_mode["entry"] = self.entry_validator.validate_signal(self.x_mode, value)
|
||||
self._switch_x_axis_item(mode="device")
|
||||
self._refresh_x_label_suffix()
|
||||
self._refresh_history_curves()
|
||||
self._update_curve_visibility()
|
||||
self.async_signal_update.emit()
|
||||
self.sync_signal_update.emit()
|
||||
self.plot_item.enableAutoRange(x=True)
|
||||
@ -673,6 +679,8 @@ class Waveform(PlotBase):
|
||||
color: str | None = None,
|
||||
label: str | None = None,
|
||||
dap: str | None = None,
|
||||
scan_id: str | None = None,
|
||||
scan_number: int | None = None,
|
||||
**kwargs,
|
||||
) -> Curve:
|
||||
"""
|
||||
@ -695,6 +703,10 @@ class Waveform(PlotBase):
|
||||
dap(str): The dap model to use for the curve, only available for sync devices.
|
||||
If not specified, none will be added.
|
||||
Use the same string as is the name of the LMFit model.
|
||||
scan_id(str): Optional scan ID. When provided, the curve is treated as a **history** curve and
|
||||
the y‑data (and optional x‑data) are fetched from that historical scan. Such curves are
|
||||
never cleared by live‑scan resets.
|
||||
scan_number(int): Optional scan index. When provided, the curve is treated as a **history** curve and
|
||||
|
||||
Returns:
|
||||
Curve: The curve object.
|
||||
@ -764,6 +776,8 @@ class Waveform(PlotBase):
|
||||
label=label,
|
||||
color=color,
|
||||
source=source,
|
||||
scan_id=scan_id,
|
||||
scan_number=scan_number,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@ -771,6 +785,9 @@ class Waveform(PlotBase):
|
||||
if source == "device":
|
||||
config.signal = DeviceSignal(name=y_name, entry=y_entry)
|
||||
|
||||
if scan_id is not None or scan_number is not None:
|
||||
config.source = "history"
|
||||
|
||||
# CREATE THE CURVE
|
||||
curve = self._add_curve(config=config, x_data=x_data, y_data=y_data)
|
||||
|
||||
@ -809,7 +826,7 @@ class Waveform(PlotBase):
|
||||
device_curve = self._find_curve_by_label(device_label)
|
||||
if not device_curve:
|
||||
raise ValueError(f"No existing curve found with label '{device_label}'.")
|
||||
if device_curve.config.source != "device":
|
||||
if device_curve.config.source not in ("device", "history"):
|
||||
raise ValueError(
|
||||
f"Curve '{device_label}' is not a device curve. Only device curves can have DAP."
|
||||
)
|
||||
@ -818,7 +835,7 @@ class Waveform(PlotBase):
|
||||
dev_entry = device_curve.config.signal.entry
|
||||
|
||||
# 2) Build a label for the new DAP curve
|
||||
dap_label = f"{dev_name}-{dev_entry}-{dap_name}"
|
||||
dap_label = f"{device_label}-{dap_name}"
|
||||
|
||||
# 3) Possibly raise if the DAP curve already exists
|
||||
if self._check_curve_id(dap_label):
|
||||
@ -871,7 +888,19 @@ class Waveform(PlotBase):
|
||||
ValueError: If a duplicate curve label/config is found, or if
|
||||
custom data is missing for `source='custom'`.
|
||||
"""
|
||||
scan_item: ScanDataContainer | None = None
|
||||
if config.source == "history":
|
||||
# Convert 1-based scan_number (from user/API) to 0-based list index
|
||||
list_index = config.scan_number - 1 if config.scan_number is not None else None
|
||||
scan_item = self.get_history_scan_item(scan_id=config.scan_id, scan_index=list_index)
|
||||
config.scan_id = scan_item.metadata["bec"]["scan_id"]
|
||||
# Preserve the 1-based scan_number for external labeling
|
||||
config.scan_number = scan_item.metadata["bec"]["scan_number"]
|
||||
|
||||
label = config.label
|
||||
if config.source == "history":
|
||||
label = f"{config.signal.name}-{config.signal.entry}-scan-{config.scan_number}"
|
||||
config.label = label
|
||||
if not label:
|
||||
# Fallback label
|
||||
label = WidgetContainerUtils.generate_unique_name(
|
||||
@ -893,7 +922,7 @@ class Waveform(PlotBase):
|
||||
raise ValueError("For 'custom' curves, x_data and y_data must be provided.")
|
||||
|
||||
# Actually create the Curve item
|
||||
curve = self._add_curve_object(name=label, config=config)
|
||||
curve = self._add_curve_object(name=label, config=config, scan_item=scan_item)
|
||||
|
||||
# If custom => set initial data
|
||||
if config.source == "custom" and x_data is not None and y_data is not None:
|
||||
@ -910,25 +939,126 @@ class Waveform(PlotBase):
|
||||
self.setup_dap_for_scan()
|
||||
self.roi_enable.emit(True) # Enable the ROI toolbar action
|
||||
self.request_dap() # Request DAP update directly without blocking proxy
|
||||
if config.source == "history":
|
||||
self._history_curves.append(curve)
|
||||
|
||||
return curve
|
||||
|
||||
def _add_curve_object(self, name: str, config: CurveConfig) -> Curve:
|
||||
def _add_curve_object(
|
||||
self, name: str, config: CurveConfig, scan_item: ScanDataContainer | None = None
|
||||
) -> Curve:
|
||||
"""
|
||||
Low-level creation of the PlotDataItem (Curve) from a `CurveConfig`.
|
||||
|
||||
Args:
|
||||
name (str): The name/label of the curve.
|
||||
config (CurveConfig): Configuration model describing the curve.
|
||||
scan_item (ScanDataContainer | None): Optional scan item for history curves.
|
||||
|
||||
Returns:
|
||||
Curve: The newly created curve object, added to the plot.
|
||||
"""
|
||||
curve = Curve(config=config, name=name, parent_item=self)
|
||||
self.plot_item.addItem(curve)
|
||||
if scan_item is not None:
|
||||
self._fetch_history_data_for_curve(curve, scan_item)
|
||||
self._categorise_device_curves()
|
||||
return curve
|
||||
|
||||
def _fetch_history_data_for_curve(
|
||||
self, curve: Curve, scan_item: ScanDataContainer
|
||||
) -> Curve | None:
|
||||
# Check if the data are already set
|
||||
device = curve.config.signal.name
|
||||
entry = curve.config.signal.entry
|
||||
y_device = None
|
||||
# 1. get y data
|
||||
x_data, y_data = curve.get_data()
|
||||
if y_data is None: # fetch only if not set, y device data are never chagne later
|
||||
y_device = scan_item.devices.get(device)
|
||||
if y_device is None:
|
||||
raise ValueError(f"Device '{device}' not found in scan item.")
|
||||
|
||||
y_data = y_device.get(entry).read().get("value")
|
||||
|
||||
# 2. get x data
|
||||
readout_priority = scan_item.metadata.get("bec", {}).get("readout_priority", [])
|
||||
all_devices_used = self._devices_from_readout_priority(readout_priority)
|
||||
|
||||
# Determine X-axis data
|
||||
if self.x_axis_mode["name"] == "index":
|
||||
x_data = np.arange(len(y_data))
|
||||
curve.config.current_x_mode = "index"
|
||||
elif self.x_axis_mode["name"] == "timestamp":
|
||||
if y_device is None:
|
||||
y_device = scan_item.devices.get(device)
|
||||
x_data = y_device.get(entry).read().get("timestamp")
|
||||
curve.config.current_x_mode = "timestamp"
|
||||
elif self.x_axis_mode["name"] not in ("index", "timestamp", "auto"):
|
||||
# Custom device mode
|
||||
if self.x_axis_mode["name"] not in all_devices_used:
|
||||
logger.warning(
|
||||
f"Custom device '{self.x_axis_mode['name']}' not found in scan item of history curve '{curve.name()}'."
|
||||
)
|
||||
curve.setVisible(False)
|
||||
return
|
||||
x_entry_custom = self.x_axis_mode.get("entry")
|
||||
if x_entry_custom is None:
|
||||
x_entry_custom = self.entry_validator.validate_signal(
|
||||
self.x_axis_mode["name"], None
|
||||
)
|
||||
x_device = scan_item.devices.get(self.x_axis_mode["name"])
|
||||
x_data = x_device.get(x_entry_custom).read().get("value")
|
||||
curve.config.current_x_mode = self.x_axis_mode["name"]
|
||||
elif self.x_axis_mode["name"] == "auto":
|
||||
if (
|
||||
self._current_x_device is None
|
||||
): # Scenario where no x device is set yet, because there was no live scan done in this widget yet
|
||||
# If no current x device, use the first motor from scan item
|
||||
scan_motors = self._ensure_str_list(
|
||||
scan_item.metadata.get("bec").get("scan_report_devices")
|
||||
)
|
||||
if not scan_motors: # scan was done without reported motor from whatever reason
|
||||
x_data = np.arange(len(y_data)) # Fallback to index
|
||||
curve.set_data(x=x_data, y=y_data)
|
||||
return curve
|
||||
x_entry = self.entry_validator.validate_signal(scan_motors[0], None)
|
||||
x_data = scan_item.devices.get(scan_motors[0]).get(x_entry).read().get("value")
|
||||
self._current_x_device = (scan_motors[0], x_entry)
|
||||
self._update_x_label_suffix(f" (auto: {scan_motors[0]}-{x_entry})")
|
||||
curve.config.current_x_mode = "auto"
|
||||
else: # Scan in auto mode was done and live scan already set the current x device
|
||||
if self._current_x_device[0] not in all_devices_used:
|
||||
logger.warning(
|
||||
f"Auto x data for device '{self._current_x_device[0]}' "
|
||||
f"and entry '{self._current_x_device[1]}'"
|
||||
f" not found in scan item of the history curve {curve.name()}."
|
||||
)
|
||||
curve.setVisible(False)
|
||||
return
|
||||
x_device = scan_item.devices.get(self._current_x_device[0])
|
||||
x_data = x_device.get(self._current_x_device[1]).read().get("value")
|
||||
curve.config.current_x_mode = "auto"
|
||||
if x_data is None:
|
||||
logger.warning(
|
||||
f"X data for curve '{curve.name()}' could not be determined. "
|
||||
f"Check if the x_mode '{self.x_axis_mode['name']}' is valid for the scan item."
|
||||
)
|
||||
curve.setVisible(False)
|
||||
return
|
||||
curve.set_data(x=x_data, y=y_data)
|
||||
return curve
|
||||
|
||||
def _refresh_history_curves(self):
|
||||
for curve in self._history_curves:
|
||||
scan_item = self.get_history_scan_item(
|
||||
scan_id=curve.config.scan_id, scan_index=curve.config.scan_number
|
||||
)
|
||||
if scan_item is not None:
|
||||
self._fetch_history_data_for_curve(curve, scan_item)
|
||||
else:
|
||||
logger.warning(f"Scan item for curve {curve.name()} not found.")
|
||||
|
||||
def _generate_color_from_palette(self) -> str:
|
||||
"""
|
||||
Generate a color for the next new curve, based on the current number of curves.
|
||||
@ -956,7 +1086,42 @@ class Waveform(PlotBase):
|
||||
Clear all data from the plot widget, but keep the curve references.
|
||||
"""
|
||||
for c in self.curves:
|
||||
c.clear_data()
|
||||
if c.config.source != "history":
|
||||
c.clear_data()
|
||||
|
||||
# X-axis compatibility helpers
|
||||
def _is_curve_compatible(self, curve: Curve) -> bool:
|
||||
"""
|
||||
Return True when *curve* can be shown with the current x-axis mode.
|
||||
|
||||
• ‘index’, ‘timestamp’ and ‘auto’ are always compatible.
|
||||
• For history curves we check whether the requested motor
|
||||
(self.x_axis_mode["name"]) exists in the cached
|
||||
history_data_buffer["x"] dictionary.
|
||||
• Device / DAP / custom curves are treated as compatible because
|
||||
their x-data are re-evaluated on every update.
|
||||
"""
|
||||
mode = self.x_axis_mode.get("name", "index")
|
||||
if mode in ("index", "timestamp"): # always compatible - wild west mode
|
||||
return True
|
||||
if curve.config.source == "history":
|
||||
scan_item = self.get_history_scan_item(
|
||||
scan_id=curve.config.scan_id, scan_index=curve.config.scan_number
|
||||
)
|
||||
# TODO double check this logic!!!
|
||||
curve = self._fetch_history_data_for_curve(curve, scan_item)
|
||||
if curve is None:
|
||||
return False
|
||||
if curve.config.source == "device":
|
||||
# TODO do some check for live curve, not yet implemented, check should be done from self.scan_item
|
||||
return True
|
||||
# TODO figure out how to handle DAP curves
|
||||
return True
|
||||
|
||||
def _update_curve_visibility(self) -> None:
|
||||
"""Show or hide curves according to `_is_curve_compatible`."""
|
||||
for c in self.curves:
|
||||
c.setVisible(self._is_curve_compatible(c))
|
||||
|
||||
def clear_all(self):
|
||||
"""
|
||||
@ -1118,7 +1283,7 @@ class Waveform(PlotBase):
|
||||
self.scan_id = current_scan_id
|
||||
self.scan_item = self.queue.scan_storage.find_scan_by_ID(self.scan_id) # live scan
|
||||
self._slice_index = None # Reset the slice index
|
||||
|
||||
self._update_curve_visibility()
|
||||
self._mode = self._categorise_device_curves()
|
||||
|
||||
# First trigger to sync and async data
|
||||
@ -1604,6 +1769,7 @@ class Waveform(PlotBase):
|
||||
entry_obj = data.get(x_name, {}).get(x_entry)
|
||||
x_data = entry_obj.read()["value"] if entry_obj else [0]
|
||||
new_suffix = f" (custom: {x_name}-{x_entry})"
|
||||
self._current_x_device = (x_name, x_entry)
|
||||
|
||||
# 2 User wants timestamp
|
||||
if self.x_axis_mode["name"] == "timestamp":
|
||||
@ -1618,11 +1784,13 @@ class Waveform(PlotBase):
|
||||
timestamps = entry_obj.read()["timestamp"] if entry_obj else [0]
|
||||
x_data = timestamps
|
||||
new_suffix = " (timestamp)"
|
||||
self._current_x_device = None
|
||||
|
||||
# 3 User wants index
|
||||
if self.x_axis_mode["name"] == "index":
|
||||
x_data = None
|
||||
new_suffix = " (index)"
|
||||
self._current_x_device = None
|
||||
|
||||
# 4 Best effort automatic mode
|
||||
if self.x_axis_mode["name"] is None or self.x_axis_mode["name"] == "auto":
|
||||
@ -1630,6 +1798,7 @@ class Waveform(PlotBase):
|
||||
if len(self._async_curves) > 0:
|
||||
x_data = None
|
||||
new_suffix = " (auto: index)"
|
||||
self._current_x_device = None
|
||||
# 4.2 If there are sync curves, use the first device from the scan report
|
||||
else:
|
||||
try:
|
||||
@ -1645,6 +1814,7 @@ class Waveform(PlotBase):
|
||||
entry_obj = data.get(x_name, {}).get(x_entry)
|
||||
x_data = entry_obj.read()["value"] if entry_obj else None
|
||||
new_suffix = f" (auto: {x_name}-{x_entry})"
|
||||
self._current_x_device = (x_name, x_entry)
|
||||
self._update_x_label_suffix(new_suffix)
|
||||
return x_data
|
||||
|
||||
@ -1661,30 +1831,6 @@ class Waveform(PlotBase):
|
||||
self.x_axis_mode["label_suffix"] = new_suffix
|
||||
self.set_x_label_suffix(new_suffix)
|
||||
|
||||
def _refresh_x_label_suffix(self):
|
||||
"""
|
||||
Update the x‑axis label suffix immediately based on the current
|
||||
``x_axis_mode`` dictionary, without waiting for the next data
|
||||
update cycle.
|
||||
"""
|
||||
mode = self.x_axis_mode.get("name", "auto")
|
||||
|
||||
if mode == "timestamp":
|
||||
suffix = " (timestamp)"
|
||||
elif mode == "index":
|
||||
suffix = " (index)"
|
||||
elif mode == "auto":
|
||||
suffix = " (auto)"
|
||||
else:
|
||||
x_name = mode
|
||||
x_entry = self.x_axis_mode.get("entry", None)
|
||||
if x_entry:
|
||||
suffix = f" (custom: {x_name}-{x_entry})"
|
||||
else:
|
||||
suffix = f" (custom: {x_name})"
|
||||
|
||||
self._update_x_label_suffix(suffix)
|
||||
|
||||
def _switch_x_axis_item(self, mode: str):
|
||||
"""
|
||||
Switch the x-axis mode between timestamp, index, the best effort and custom signal.
|
||||
@ -1786,7 +1932,7 @@ class Waveform(PlotBase):
|
||||
ScanDataContainer | None: The fetched scan item or None if no item was found.
|
||||
"""
|
||||
if scan_index is not None and scan_id is not None:
|
||||
raise ValueError("Only one of scan_id or scan_index can be provided.")
|
||||
scan_index = None # Only one of scan_id or scan_index can be provided -> scan id is more reliable
|
||||
|
||||
if scan_index is None and scan_id is None:
|
||||
logger.warning("Neither scan_id or scan_number was provided, fetching the latest scan")
|
||||
@ -1821,9 +1967,6 @@ class Waveform(PlotBase):
|
||||
scan_id(str, optional): ScanID of the scan to be updated. Defaults to None.
|
||||
scan_index(int, optional): Index of the scan to be updated. Defaults to None.
|
||||
"""
|
||||
if scan_index is not None and scan_id is not None:
|
||||
raise ValueError("Only one of scan_id or scan_index can be provided.")
|
||||
|
||||
self.scan_item = self.get_history_scan_item(scan_index=scan_index, scan_id=scan_id)
|
||||
|
||||
if self.scan_item is None:
|
||||
@ -1832,7 +1975,7 @@ class Waveform(PlotBase):
|
||||
if scan_id is not None:
|
||||
self.scan_id = scan_id
|
||||
else:
|
||||
# If scan_index was used, set the scan_id from the fetched item
|
||||
# If scan_number was used, set the scan_id from the fetched item
|
||||
if hasattr(self.scan_item, "metadata"):
|
||||
self.scan_id = self.scan_item.metadata["bec"]["scan_id"]
|
||||
else:
|
||||
@ -1950,6 +2093,35 @@ class Waveform(PlotBase):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _devices_from_readout_priority(
|
||||
self, rp: dict | None, *, exclude_keys: set[str] | None = None # keyword-only
|
||||
) -> list[str]:
|
||||
"""
|
||||
Flatten a BEC *readout_priority* mapping into a list[str] of device names.
|
||||
|
||||
Args:
|
||||
rp(dict | None): The readout priority mapping from the scan report.
|
||||
|
||||
Returns:
|
||||
list[str]: A list of unique device names from the readout priority mapping.
|
||||
"""
|
||||
if not rp:
|
||||
return []
|
||||
|
||||
if exclude_keys is None: # baseline readings are excluded by default
|
||||
exclude_keys = {"baseline"}
|
||||
|
||||
seen: set[str] = set()
|
||||
devices: list[str] = []
|
||||
for key, value in rp.items():
|
||||
if key in exclude_keys:
|
||||
continue
|
||||
for dev in self._ensure_str_list(value):
|
||||
if dev not in seen:
|
||||
seen.add(dev)
|
||||
devices.append(dev)
|
||||
return devices
|
||||
|
||||
def _ensure_str_list(self, entries: list | tuple | np.ndarray):
|
||||
"""
|
||||
Convert a variety of possible inputs (string, bytes, list/tuple/ndarray of either)
|
||||
@ -2081,6 +2253,7 @@ class DemoApp(QMainWindow): # pragma: no cover
|
||||
|
||||
self.waveform_popup = Waveform(popups=True)
|
||||
self.waveform_popup.plot(y_name="waveform")
|
||||
self.waveform_popup.plot("bpm4i", scan_number=0)
|
||||
|
||||
self.waveform_side = Waveform(popups=False)
|
||||
self.waveform_side.plot(y_name="bpm4i", y_entry="bpm4i", dap="GaussianModel")
|
||||
|
Reference in New Issue
Block a user