diff --git a/bec_widgets/widgets/plots/waveform/waveform.py b/bec_widgets/widgets/plots/waveform/waveform.py index 3d6127dc..1c075029 100644 --- a/bec_widgets/widgets/plots/waveform/waveform.py +++ b/bec_widgets/widgets/plots/waveform/waveform.py @@ -1,7 +1,7 @@ from __future__ import annotations import json -from typing import Literal +from typing import Any, Literal import lmfit import numpy as np @@ -163,7 +163,7 @@ class Waveform(PlotBase): self._async_curves = [] self._slice_index = None self._dap_curves = [] - self._mode: Literal["none", "sync", "async", "mixed"] = "none" + self._mode = None # Scan data self._scan_done = True # means scan is not running @@ -1139,7 +1139,7 @@ class Waveform(PlotBase): QTimer.singleShot(100, self.update_sync_curves) QTimer.singleShot(300, self.update_sync_curves) - def _fetch_scan_data_and_access(self): + def _fetch_scan_data_and_access(self) -> tuple[dict, str] | tuple[None, None]: """ Decide whether the widget is in live or historical mode and return the appropriate data dict and access key. @@ -1153,7 +1153,7 @@ class Waveform(PlotBase): self.update_with_scan_history(-1) if self.scan_item is None: logger.info("No scan executed so far; skipping device curves categorisation.") - return "none", "none" + return None, None if hasattr(self.scan_item, "live_data"): # Live scan @@ -1169,7 +1169,7 @@ class Waveform(PlotBase): """ if self.scan_item is None: logger.info("No scan executed so far; skipping device curves categorisation.") - return "none" + return data, access_key = self._fetch_scan_data_and_access() for curve in self._sync_curves: device_name = curve.config.signal.name @@ -1177,9 +1177,8 @@ class Waveform(PlotBase): if access_key == "val": device_data = data.get(device_name, {}).get(device_entry, {}).get(access_key, None) else: - device_data = ( - data.get(device_name, {}).get(device_entry, {}).read().get("value", None) - ) + entry_obj = data.get(device_name, {}).get(device_entry) + device_data = entry_obj.read()["value"] if entry_obj else None x_data = self._get_x_data(device_name, device_entry) if x_data is not None: if len(x_data) == 1: @@ -1217,7 +1216,8 @@ class Waveform(PlotBase): if self._skip_large_dataset_check is False: if not self._check_dataset_size_and_confirm(dataset_obj, device_entry): continue # user declined to load; skip this curve - device_data = dataset_obj.get(device_entry, {}).read().get("value", None) + entry_obj = dataset_obj.get(device_entry, None) + device_data = entry_obj.read()["value"] if entry_obj else None # if shape is 2D cast it into 1D and take the last waveform if len(np.shape(device_data)) > 1: @@ -1549,15 +1549,21 @@ class Waveform(PlotBase): if access_key == "val": # live data x_data = data.get(x_name, {}).get(x_entry, {}).get(access_key, [0]) else: # history data - x_data = data.get(x_name, {}).get(x_entry, {}).read().get("value", [0]) + entry_obj = data.get(x_name, {}).get(x_entry) + x_data = entry_obj.read()["value"] if entry_obj else [0] new_suffix = f" (custom: {x_name}-{x_entry})" # 2 User wants timestamp if self.x_axis_mode["name"] == "timestamp": if access_key == "val": # live - timestamps = data[device_name][device_entry].timestamps + x_data = data.get(device_name, {}).get(device_entry, None) + if x_data is None: + return None + else: + timestamps = x_data.timestamps else: # history data - timestamps = data[device_name][device_entry].read().get("timestamp", [0]) + entry_obj = data.get(device_name, {}).get(device_entry) + timestamps = entry_obj.read()["timestamp"] if entry_obj else [0] x_data = timestamps new_suffix = " (timestamp)" @@ -1584,7 +1590,8 @@ class Waveform(PlotBase): if access_key == "val": x_data = data.get(x_name, {}).get(x_entry, {}).get(access_key, None) else: - x_data = data.get(x_name, {}).get(x_entry, {}).read().get("value", None) + entry_obj = data.get(x_name, {}).get(x_entry) + x_data = entry_obj.read()["value"] if entry_obj else None new_suffix = f" (auto: {x_name}-{x_entry})" self._update_x_label_suffix(new_suffix) return x_data @@ -1637,7 +1644,7 @@ class Waveform(PlotBase): self.update_with_scan_history(-1) if self.scan_item is None: logger.info("No scan executed so far; skipping device curves categorisation.") - return "none" + return None if hasattr(self.scan_item, "live_data"): readout_priority = self.scan_item.status_message.info["readout_priority"] # live data