0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-13 11:11:49 +02:00

feat(figure): added support for async device readbacks as curve data

This commit is contained in:
2024-07-07 12:37:31 +02:00
parent fc3a69bbb0
commit 70fd1b7be4
3 changed files with 75 additions and 7 deletions

View File

@ -134,7 +134,10 @@ class BECDispatcher:
cls.qapp = None cls.qapp = None
def connect_slot( def connect_slot(
self, slot: Callable, topics: Union[EndpointInfo, str, list[Union[EndpointInfo, str]]] self,
slot: Callable,
topics: Union[EndpointInfo, str, list[Union[EndpointInfo, str]]],
**kwargs,
) -> None: ) -> None:
"""Connect widget's pyqt slot, so that it is called on new pub/sub topic message. """Connect widget's pyqt slot, so that it is called on new pub/sub topic message.
@ -144,7 +147,7 @@ class BECDispatcher:
topics (EndpointInfo | str | list): A topic or list of topics that can typically be acquired via bec_lib.MessageEndpoints topics (EndpointInfo | str | list): A topic or list of topics that can typically be acquired via bec_lib.MessageEndpoints
""" """
slot = QtThreadSafeCallback(slot) slot = QtThreadSafeCallback(slot)
self.client.connector.register(topics, cb=slot) self.client.connector.register(topics, cb=slot, **kwargs)
topics_str, _ = self.client.connector._convert_endpointinfo(topics) topics_str, _ = self.client.connector._convert_endpointinfo(topics)
self._slots[slot].update(set(topics_str)) self._slots[slot].update(set(topics_str))

View File

@ -7,6 +7,7 @@ from typing import Any, Literal, Optional
import numpy as np import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
from bec_lib import messages from bec_lib import messages
from bec_lib.device import ReadoutPriority
from bec_lib.endpoints import MessageEndpoints from bec_lib.endpoints import MessageEndpoints
from bec_lib.scan_data import ScanData from bec_lib.scan_data import ScanData
from pydantic import Field, ValidationError from pydantic import Field, ValidationError
@ -327,7 +328,7 @@ class BECWaveform(BECPlotBase):
color_map_z: Optional[str] = "plasma", color_map_z: Optional[str] = "plasma",
label: Optional[str] = None, label: Optional[str] = None,
validate_bec: bool = True, validate_bec: bool = True,
source: str = "scan_segment", source: Optional[str] = None,
dap: Optional[str] = None, dap: Optional[str] = None,
**kwargs, **kwargs,
) -> BECCurve: ) -> BECCurve:
@ -349,8 +350,6 @@ class BECWaveform(BECPlotBase):
Returns: Returns:
BECCurve: The curve object. BECCurve: The curve object.
""" """
# Check if curve already exists
curve_source = source
# Get entry if not provided and validate # Get entry if not provided and validate
x_entry, y_entry, z_entry = self._validate_signal_entries( x_entry, y_entry, z_entry = self._validate_signal_entries(
@ -362,6 +361,7 @@ class BECWaveform(BECPlotBase):
else: else:
label = label or f"{y_name}-{y_entry}" label = label or f"{y_name}-{y_entry}"
# Check if curve already exists
curve_exits = self._check_curve_id(label, self._curves_data) curve_exits = self._check_curve_id(label, self._curves_data)
if curve_exits: if curve_exits:
raise ValueError(f"Curve with ID '{label}' already exists in widget '{self.gui_id}'.") raise ValueError(f"Curve with ID '{label}' already exists in widget '{self.gui_id}'.")
@ -373,6 +373,18 @@ class BECWaveform(BECPlotBase):
)[-1] )[-1]
) )
if source is None:
if validate_bec:
curve_source = (
"async_readback"
if self.dev[y_name].readout_priority == ReadoutPriority.ASYNC
else "scan_segment"
)
else:
curve_source = "scan_segment"
else:
curve_source = source
# Create curve by config # Create curve by config
curve_config = CurveConfig( curve_config = CurveConfig(
widget_class="BECCurve", widget_class="BECCurve",

View File

@ -1,15 +1,17 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Any, Literal, Optional from typing import TYPE_CHECKING, Literal, Optional
import pyqtgraph as pg import pyqtgraph as pg
from bec_lib.endpoints import MessageEndpoints
from pydantic import BaseModel, Field, field_validator from pydantic import BaseModel, Field, field_validator
from pydantic_core import PydanticCustomError
from qtpy import QtCore from qtpy import QtCore
from bec_widgets.utils import BECConnector, Colors, ConnectionConfig from bec_widgets.utils import BECConnector, Colors, ConnectionConfig
if TYPE_CHECKING: if TYPE_CHECKING:
import numpy as np
from bec_widgets.widgets.figure.plots.waveform import BECWaveform1D from bec_widgets.widgets.figure.plots.waveform import BECWaveform1D
@ -103,6 +105,14 @@ class BECCurve(BECConnector, pg.PlotDataItem):
if kwargs: if kwargs:
self.set(**kwargs) self.set(**kwargs)
self._async_info = {}
if self.config.source == "async_readback":
# get updates on new scans in order to change the subscription to the correct async readback
self.bec_dispatcher.connect_slot(
self.on_scan_status_update, MessageEndpoints.scan_status()
)
def apply_config(self): def apply_config(self):
pen_style_map = { pen_style_map = {
"solid": QtCore.Qt.SolidLine, "solid": QtCore.Qt.SolidLine,
@ -137,6 +147,49 @@ class BECCurve(BECConnector, pg.PlotDataItem):
else: else:
raise ValueError(f"Source {self.config.source} do not allow custom data setting.") raise ValueError(f"Source {self.config.source} do not allow custom data setting.")
def on_scan_status_update(self, content: dict, metadata: dict):
"""
Update the async info with the latest scan status.
"""
scan_id = content.get("scan_id")
if scan_id == self._async_info.get("scan_id"):
return
active_subscription = self._async_info.get("active_subscription")
if active_subscription:
self.bec_dispatcher.disconnect_slot(self.on_async_update, active_subscription)
self._async_info["scan_id"] = scan_id
self._async_info["active_subscription"] = MessageEndpoints.device_async_readback(
scan_id, self.config.signals.y.name
)
self._async_info["data"] = []
self.bec_dispatcher.connect_slot(
self.on_async_update,
MessageEndpoints.device_async_readback(scan_id, self.config.signals.y.name),
from_start=True,
)
def on_async_update(self, content: dict, metadata: dict):
"""
Update the curve with the latest async readback data.
"""
async_update = metadata.get("async_update")
if not async_update:
return
signals = content.get("signals")
if not signals:
return
y_data = signals.get(self.config.signals.y.name, {}).get("value", [])
if y_data is None:
return
if async_update == "extend":
self._async_info["data"].extend(y_data)
elif async_update == "replace":
self._async_info["data"] = y_data
else:
print(f"Warning: Unsupported async update type {async_update}.")
self.setData(self._async_info["data"])
def set(self, **kwargs): def set(self, **kwargs):
""" """
Set the properties of the curve. Set the properties of the curve.