fix(waveform): classify curves by signal class, not device readout priority

This commit is contained in:
2026-06-18 16:43:23 +02:00
committed by Jan Wyzula
co-authored by Jan Wyzula
parent e9a7b03243
commit d6231726d6
5 changed files with 227 additions and 6 deletions
+10 -2
View File
@@ -11,7 +11,13 @@ from bec_lib.devicemanager import DeviceContainer
class FakeDevice(BECDevice):
"""Fake minimal positioner class for testing."""
def __init__(self, name, enabled=True, readout_priority=ReadoutPriority.MONITORED):
def __init__(
self,
name,
enabled=True,
readout_priority=ReadoutPriority.MONITORED,
signal_class: str | None = None,
):
super().__init__(name=name)
self._enabled = enabled
self.signals = {self.name: {"value": 1.0}}
@@ -26,13 +32,15 @@ class FakeDevice(BECDevice):
"readOnly": False,
"name": self.name,
}
if signal_class is None:
signal_class = "AsyncSignal" if readout_priority == ReadoutPriority.ASYNC else "Signal"
self._info = {
"signals": {
self.name: {
"kind_str": "hinted",
"component_name": self.name,
"obj_name": self.name,
"signal_class": "Signal",
"signal_class": signal_class,
}
}
}
@@ -0,0 +1,84 @@
from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from bec_lib.device import DeviceBase
ASYNC_SIGNAL_CLASSES = frozenset({"AsyncSignal", "AsyncMultiSignal", "DynamicSignal"})
_NON_CURVE_ROLES = frozenset({"preview", "diagnostic", "file_event", "progress"})
class SignalCategory(str, Enum):
"""Data-delivery category of a signal."""
SYNC = "sync"
ASYNC = "async"
UNKNOWN = "unknown"
def classify_signal_info(signal_info: dict[str, Any] | None) -> SignalCategory:
"""
Classify a serialized signal-info dict (one entry of
``device._info["signals"]``).
Args:
signal_info: The serialized signal info, or None.
Returns:
SignalCategory: ASYNC for the DynamicSignal family (including
subclasses detected via the embedded ``signal_info`` block), SYNC for
all other concrete signal classes, UNKNOWN when no decision is
possible.
"""
if not isinstance(signal_info, dict) or not signal_info:
return SignalCategory.UNKNOWN
signal_class = signal_info.get("signal_class")
if signal_class in ASYNC_SIGNAL_CLASSES:
return SignalCategory.ASYNC
describe = signal_info.get("describe")
embedded = describe.get("signal_info") if isinstance(describe, dict) else None
if isinstance(embedded, dict):
role = embedded.get("role", "main")
if role in _NON_CURVE_ROLES:
return SignalCategory.UNKNOWN
return SignalCategory.ASYNC
if signal_class:
return SignalCategory.SYNC
return SignalCategory.UNKNOWN
def classify_device_signal(device: DeviceBase | None, entry: str) -> SignalCategory:
"""
Classify a signal of a device by its serialized info.
Args:
device: The client device (``bec_lib.device.DeviceBase``) from the device
manager, or None (e.g. history data whose device no longer exists).
entry: The signal entry name (component name used in curve configs).
Returns:
SignalCategory: The category, UNKNOWN when the device or entry cannot
be resolved.
"""
if device is None or not entry:
return SignalCategory.UNKNOWN
info = getattr(device, "_info", None)
if not isinstance(info, dict):
return SignalCategory.UNKNOWN
signals = info.get("signals")
if not isinstance(signals, dict):
return SignalCategory.UNKNOWN
signal_info = signals.get(entry)
if signal_info is None:
for candidate in signals.values():
if isinstance(candidate, dict) and candidate.get("obj_name") == entry:
signal_info = candidate
break
return classify_signal_info(signal_info)
+10 -4
View File
@@ -33,6 +33,7 @@ from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import SafeProperty, SafeSlot
from bec_widgets.utils.settings_dialog import SettingsDialog
from bec_widgets.utils.side_panel import SidePanel
from bec_widgets.utils.signal_classification import SignalCategory, classify_device_signal
from bec_widgets.utils.toolbars.bundles import ToolbarBundle
from bec_widgets.utils.toolbars.toolbar import MaterialIconAction
from bec_widgets.widgets.dap.lmfit_dialog.lmfit_dialog import LMFitDialog
@@ -2308,21 +2309,26 @@ class Waveform(PlotBase):
readout_priority_async = self._ensure_str_list(readout_priority.get("async", []))
readout_priority_sync = self._ensure_str_list(readout_priority.get("monitored", []))
# Iterate over all curves
for curve in self.curves:
if curve.config.source != "device":
continue
dev_name = curve.config.signal.device
if dev_name in readout_priority_async:
entry = curve.config.signal.signal
category = classify_device_signal(self.dev.get(dev_name), entry)
if category == SignalCategory.ASYNC or dev_name in readout_priority_async:
self._async_curves.append(curve)
if hasattr(self.scan_item, "live_data"):
self._setup_async_curve(curve)
found_async = True
elif dev_name in readout_priority_sync:
elif category == SignalCategory.SYNC or dev_name in readout_priority_sync:
self._sync_curves.append(curve)
found_sync = True
else:
logger.warning("Device {dev_name} not found in readout priority list.")
logger.warning(
f"Cannot classify signal {dev_name}.{entry}: no signal info and "
"device not found in readout priority list."
)
continue
# Determine the mode of the scan
if found_async and found_sync:
mode = "mixed"
@@ -0,0 +1,78 @@
from types import SimpleNamespace
from bec_widgets.utils.signal_classification import (
SignalCategory,
classify_device_signal,
classify_signal_info,
)
def _info(signal_class=None, role=None, with_embedded=False):
info = {"signal_class": signal_class, "component_name": "comp", "obj_name": "dev_comp"}
if with_embedded:
info["describe"] = {"signal_info": {"role": role or "main", "signals": [["comp", 5]]}}
return info
def _device(signals: dict):
return SimpleNamespace(_info={"signals": signals})
def test_sync_signal_classes_are_sync():
for cls in ("Signal", "EpicsSignal", "EpicsSignalRO", "SetableSignal", "ReadOnlySignal"):
assert classify_signal_info(_info(signal_class=cls)) == SignalCategory.SYNC
def test_async_signal_classes_are_async():
for cls in ("AsyncSignal", "AsyncMultiSignal", "DynamicSignal"):
assert classify_signal_info(_info(signal_class=cls)) == SignalCategory.ASYNC
def test_dynamic_signal_subclass_detected_via_embedded_info():
"""A DynamicSignal subclass serializes its concrete class name; the
embedded signal_info block must still classify it as async."""
info = _info(signal_class="MyBeamlineAsyncSignal", with_embedded=True)
assert classify_signal_info(info) == SignalCategory.ASYNC
def test_non_curve_roles_are_unknown():
for role in ("preview", "diagnostic", "file_event", "progress"):
info = _info(signal_class="PreviewSignal", role=role, with_embedded=True)
assert classify_signal_info(info) == SignalCategory.UNKNOWN
def test_missing_or_empty_info_is_unknown():
assert classify_signal_info(None) == SignalCategory.UNKNOWN
assert classify_signal_info({}) == SignalCategory.UNKNOWN
assert classify_signal_info({"signal_class": None}) == SignalCategory.UNKNOWN
def test_monitored_device_with_mixed_signals():
"""One (monitored) device exposing both
a synchronous and an asynchronous signal must classify per signal, not
per device."""
device = _device(
{
"readback": _info(signal_class="Signal"),
"stream": _info(signal_class="AsyncSignal"),
"multi": _info(signal_class="AsyncMultiSignal"),
}
)
assert classify_device_signal(device, "readback") == SignalCategory.SYNC
assert classify_device_signal(device, "stream") == SignalCategory.ASYNC
assert classify_device_signal(device, "multi") == SignalCategory.ASYNC
def test_entry_resolved_via_obj_name():
info = _info(signal_class="AsyncSignal")
info["obj_name"] = "dev1_stream"
device = _device({"stream": info})
assert classify_device_signal(device, "dev1_stream") == SignalCategory.ASYNC
def test_unresolvable_device_or_entry_is_unknown():
assert classify_device_signal(None, "x") == SignalCategory.UNKNOWN
assert classify_device_signal(SimpleNamespace(), "x") == SignalCategory.UNKNOWN
assert classify_device_signal(_device({}), "missing") == SignalCategory.UNKNOWN
assert classify_device_signal(_device({"a": _info("Signal")}), "") == SignalCategory.UNKNOWN
+45
View File
@@ -1642,3 +1642,48 @@ def test_history_curve_scan_not_found_returns_none(qtbot, mocked_client):
# No history scans injected for this widget
c = wf.plot(device_y="bpm4i", signal_y="bpm4i", scan_id="unknown_scan")
assert c is None
def test_categorise_device_curves_monitored_device_with_async_signal(qtbot, mocked_client):
"""Device listed under 'monitored'
readout priority may expose an asynchronous signal; the curve must be
classified by the signal class, not the parent device's readout priority."""
wf = create_widget(qtbot, Waveform, client=mocked_client)
dummy_scan = create_dummy_scan_item()
wf.scan_item = dummy_scan
# bpm4i is listed under 'monitored' in the dummy scan; give it an
# additional async signal, as a mixed device would have.
device = mocked_client.device_manager.devices["bpm4i"]
device.signals["bpm4i_stream"] = {"value": 0.0}
device._info["signals"]["bpm4i_stream"] = {
"kind_str": "hinted",
"component_name": "bpm4i_stream",
"obj_name": "bpm4i_stream",
"signal_class": "AsyncSignal",
}
c_sync = wf.plot(arg1="bpm4i", label="sync-curve")
c_async = wf.plot(device_y="bpm4i", signal_y="bpm4i_stream", label="async-curve")
mode = wf._categorise_device_curves()
assert mode == "mixed"
assert c_sync in wf._sync_curves
assert c_async in wf._async_curves
def test_categorise_device_curves_falls_back_to_readout_priority(qtbot, mocked_client):
"""When no signal info is available (e.g. history data for a removed
device), classification falls back to the scan's readout-priority lists."""
wf = create_widget(qtbot, Waveform, client=mocked_client)
dummy_scan = create_dummy_scan_item()
wf.scan_item = dummy_scan
c_async = wf.plot(arg1="async_device", label="fallback-curve")
# Simulate the device's signal info being unavailable.
mocked_client.device_manager.devices["async_device"]._info = {}
wf._categorise_device_curves()
assert c_async in wf._async_curves