From 70fd1b7be4faaea9379d9f3fefdfb2e92d49c840 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Sun, 7 Jul 2024 12:37:31 +0200 Subject: [PATCH] feat(figure): added support for async device readbacks as curve data --- bec_widgets/utils/bec_dispatcher.py | 7 ++- .../widgets/figure/plots/waveform/waveform.py | 18 +++++- .../figure/plots/waveform/waveform_curve.py | 57 ++++++++++++++++++- 3 files changed, 75 insertions(+), 7 deletions(-) diff --git a/bec_widgets/utils/bec_dispatcher.py b/bec_widgets/utils/bec_dispatcher.py index 8c6c52c7..9a93aa2a 100644 --- a/bec_widgets/utils/bec_dispatcher.py +++ b/bec_widgets/utils/bec_dispatcher.py @@ -134,7 +134,10 @@ class BECDispatcher: cls.qapp = None def connect_slot( - self, slot: Callable, topics: Union[EndpointInfo, str, list[Union[EndpointInfo, str]]] + self, + slot: Callable, + topics: Union[EndpointInfo, str, list[Union[EndpointInfo, str]]], + **kwargs, ) -> None: """Connect widget's pyqt slot, so that it is called on new pub/sub topic message. @@ -144,7 +147,7 @@ class BECDispatcher: topics (EndpointInfo | str | list): A topic or list of topics that can typically be acquired via bec_lib.MessageEndpoints """ slot = QtThreadSafeCallback(slot) - self.client.connector.register(topics, cb=slot) + self.client.connector.register(topics, cb=slot, **kwargs) topics_str, _ = self.client.connector._convert_endpointinfo(topics) self._slots[slot].update(set(topics_str)) diff --git a/bec_widgets/widgets/figure/plots/waveform/waveform.py b/bec_widgets/widgets/figure/plots/waveform/waveform.py index 160b530c..ce1ff7eb 100644 --- a/bec_widgets/widgets/figure/plots/waveform/waveform.py +++ b/bec_widgets/widgets/figure/plots/waveform/waveform.py @@ -7,6 +7,7 @@ from typing import Any, Literal, Optional import numpy as np import pyqtgraph as pg from bec_lib import messages +from bec_lib.device import ReadoutPriority from bec_lib.endpoints import MessageEndpoints from bec_lib.scan_data import ScanData from pydantic import Field, ValidationError @@ -327,7 +328,7 @@ class BECWaveform(BECPlotBase): color_map_z: Optional[str] = "plasma", label: Optional[str] = None, validate_bec: bool = True, - source: str = "scan_segment", + source: Optional[str] = None, dap: Optional[str] = None, **kwargs, ) -> BECCurve: @@ -349,8 +350,6 @@ class BECWaveform(BECPlotBase): Returns: BECCurve: The curve object. """ - # Check if curve already exists - curve_source = source # Get entry if not provided and validate x_entry, y_entry, z_entry = self._validate_signal_entries( @@ -362,6 +361,7 @@ class BECWaveform(BECPlotBase): else: label = label or f"{y_name}-{y_entry}" + # Check if curve already exists curve_exits = self._check_curve_id(label, self._curves_data) if curve_exits: raise ValueError(f"Curve with ID '{label}' already exists in widget '{self.gui_id}'.") @@ -373,6 +373,18 @@ class BECWaveform(BECPlotBase): )[-1] ) + if source is None: + if validate_bec: + curve_source = ( + "async_readback" + if self.dev[y_name].readout_priority == ReadoutPriority.ASYNC + else "scan_segment" + ) + else: + curve_source = "scan_segment" + else: + curve_source = source + # Create curve by config curve_config = CurveConfig( widget_class="BECCurve", diff --git a/bec_widgets/widgets/figure/plots/waveform/waveform_curve.py b/bec_widgets/widgets/figure/plots/waveform/waveform_curve.py index fc2f4962..89c8ab13 100644 --- a/bec_widgets/widgets/figure/plots/waveform/waveform_curve.py +++ b/bec_widgets/widgets/figure/plots/waveform/waveform_curve.py @@ -1,15 +1,17 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Literal, Optional +from typing import TYPE_CHECKING, Literal, Optional import pyqtgraph as pg +from bec_lib.endpoints import MessageEndpoints from pydantic import BaseModel, Field, field_validator -from pydantic_core import PydanticCustomError from qtpy import QtCore from bec_widgets.utils import BECConnector, Colors, ConnectionConfig if TYPE_CHECKING: + import numpy as np + from bec_widgets.widgets.figure.plots.waveform import BECWaveform1D @@ -103,6 +105,14 @@ class BECCurve(BECConnector, pg.PlotDataItem): if kwargs: self.set(**kwargs) + self._async_info = {} + + if self.config.source == "async_readback": + # get updates on new scans in order to change the subscription to the correct async readback + self.bec_dispatcher.connect_slot( + self.on_scan_status_update, MessageEndpoints.scan_status() + ) + def apply_config(self): pen_style_map = { "solid": QtCore.Qt.SolidLine, @@ -137,6 +147,49 @@ class BECCurve(BECConnector, pg.PlotDataItem): else: raise ValueError(f"Source {self.config.source} do not allow custom data setting.") + def on_scan_status_update(self, content: dict, metadata: dict): + """ + Update the async info with the latest scan status. + """ + scan_id = content.get("scan_id") + if scan_id == self._async_info.get("scan_id"): + return + active_subscription = self._async_info.get("active_subscription") + if active_subscription: + self.bec_dispatcher.disconnect_slot(self.on_async_update, active_subscription) + self._async_info["scan_id"] = scan_id + self._async_info["active_subscription"] = MessageEndpoints.device_async_readback( + scan_id, self.config.signals.y.name + ) + self._async_info["data"] = [] + self.bec_dispatcher.connect_slot( + self.on_async_update, + MessageEndpoints.device_async_readback(scan_id, self.config.signals.y.name), + from_start=True, + ) + + def on_async_update(self, content: dict, metadata: dict): + """ + Update the curve with the latest async readback data. + """ + async_update = metadata.get("async_update") + if not async_update: + return + signals = content.get("signals") + if not signals: + return + y_data = signals.get(self.config.signals.y.name, {}).get("value", []) + if y_data is None: + return + if async_update == "extend": + self._async_info["data"].extend(y_data) + elif async_update == "replace": + self._async_info["data"] = y_data + else: + print(f"Warning: Unsupported async update type {async_update}.") + + self.setData(self._async_info["data"]) + def set(self, **kwargs): """ Set the properties of the curve.