diff --git a/bec_widgets/tests/utils.py b/bec_widgets/tests/utils.py index bf12d37c..b0522a4f 100644 --- a/bec_widgets/tests/utils.py +++ b/bec_widgets/tests/utils.py @@ -11,7 +11,13 @@ from bec_lib.devicemanager import DeviceContainer class FakeDevice(BECDevice): """Fake minimal positioner class for testing.""" - def __init__(self, name, enabled=True, readout_priority=ReadoutPriority.MONITORED): + def __init__( + self, + name, + enabled=True, + readout_priority=ReadoutPriority.MONITORED, + signal_class: str | None = None, + ): super().__init__(name=name) self._enabled = enabled self.signals = {self.name: {"value": 1.0}} @@ -26,13 +32,15 @@ class FakeDevice(BECDevice): "readOnly": False, "name": self.name, } + if signal_class is None: + signal_class = "AsyncSignal" if readout_priority == ReadoutPriority.ASYNC else "Signal" self._info = { "signals": { self.name: { "kind_str": "hinted", "component_name": self.name, "obj_name": self.name, - "signal_class": "Signal", + "signal_class": signal_class, } } } diff --git a/bec_widgets/utils/signal_classification.py b/bec_widgets/utils/signal_classification.py new file mode 100644 index 00000000..594cc0cc --- /dev/null +++ b/bec_widgets/utils/signal_classification.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +from enum import Enum +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from bec_lib.device import DeviceBase + +ASYNC_SIGNAL_CLASSES = frozenset({"AsyncSignal", "AsyncMultiSignal", "DynamicSignal"}) +_NON_CURVE_ROLES = frozenset({"preview", "diagnostic", "file_event", "progress"}) + + +class SignalCategory(str, Enum): + """Data-delivery category of a signal.""" + + SYNC = "sync" + ASYNC = "async" + UNKNOWN = "unknown" + + +def classify_signal_info(signal_info: dict[str, Any] | None) -> SignalCategory: + """ + Classify a serialized signal-info dict (one entry of + ``device._info["signals"]``). + + Args: + signal_info: The serialized signal info, or None. + + Returns: + SignalCategory: ASYNC for the DynamicSignal family (including + subclasses detected via the embedded ``signal_info`` block), SYNC for + all other concrete signal classes, UNKNOWN when no decision is + possible. + """ + if not isinstance(signal_info, dict) or not signal_info: + return SignalCategory.UNKNOWN + + signal_class = signal_info.get("signal_class") + if signal_class in ASYNC_SIGNAL_CLASSES: + return SignalCategory.ASYNC + + describe = signal_info.get("describe") + embedded = describe.get("signal_info") if isinstance(describe, dict) else None + if isinstance(embedded, dict): + role = embedded.get("role", "main") + if role in _NON_CURVE_ROLES: + return SignalCategory.UNKNOWN + return SignalCategory.ASYNC + + if signal_class: + return SignalCategory.SYNC + + return SignalCategory.UNKNOWN + + +def classify_device_signal(device: DeviceBase | None, entry: str) -> SignalCategory: + """ + Classify a signal of a device by its serialized info. + + Args: + device: The client device (``bec_lib.device.DeviceBase``) from the device + manager, or None (e.g. history data whose device no longer exists). + entry: The signal entry name (component name used in curve configs). + + Returns: + SignalCategory: The category, UNKNOWN when the device or entry cannot + be resolved. + """ + if device is None or not entry: + return SignalCategory.UNKNOWN + info = getattr(device, "_info", None) + if not isinstance(info, dict): + return SignalCategory.UNKNOWN + signals = info.get("signals") + if not isinstance(signals, dict): + return SignalCategory.UNKNOWN + + signal_info = signals.get(entry) + if signal_info is None: + for candidate in signals.values(): + if isinstance(candidate, dict) and candidate.get("obj_name") == entry: + signal_info = candidate + break + return classify_signal_info(signal_info) diff --git a/bec_widgets/widgets/plots/waveform/waveform.py b/bec_widgets/widgets/plots/waveform/waveform.py index 35a2873b..11a1b7c0 100644 --- a/bec_widgets/widgets/plots/waveform/waveform.py +++ b/bec_widgets/widgets/plots/waveform/waveform.py @@ -33,6 +33,7 @@ from bec_widgets.utils.container_utils import WidgetContainerUtils from bec_widgets.utils.error_popups import SafeProperty, SafeSlot from bec_widgets.utils.settings_dialog import SettingsDialog from bec_widgets.utils.side_panel import SidePanel +from bec_widgets.utils.signal_classification import SignalCategory, classify_device_signal from bec_widgets.utils.toolbars.bundles import ToolbarBundle from bec_widgets.utils.toolbars.toolbar import MaterialIconAction from bec_widgets.widgets.dap.lmfit_dialog.lmfit_dialog import LMFitDialog @@ -2308,21 +2309,26 @@ class Waveform(PlotBase): readout_priority_async = self._ensure_str_list(readout_priority.get("async", [])) readout_priority_sync = self._ensure_str_list(readout_priority.get("monitored", [])) - # Iterate over all curves for curve in self.curves: if curve.config.source != "device": continue dev_name = curve.config.signal.device - if dev_name in readout_priority_async: + entry = curve.config.signal.signal + category = classify_device_signal(self.dev.get(dev_name), entry) + if category == SignalCategory.ASYNC or dev_name in readout_priority_async: self._async_curves.append(curve) if hasattr(self.scan_item, "live_data"): self._setup_async_curve(curve) found_async = True - elif dev_name in readout_priority_sync: + elif category == SignalCategory.SYNC or dev_name in readout_priority_sync: self._sync_curves.append(curve) found_sync = True else: - logger.warning("Device {dev_name} not found in readout priority list.") + logger.warning( + f"Cannot classify signal {dev_name}.{entry}: no signal info and " + "device not found in readout priority list." + ) + continue # Determine the mode of the scan if found_async and found_sync: mode = "mixed" diff --git a/tests/unit_tests/test_signal_classification.py b/tests/unit_tests/test_signal_classification.py new file mode 100644 index 00000000..b747d38e --- /dev/null +++ b/tests/unit_tests/test_signal_classification.py @@ -0,0 +1,78 @@ +from types import SimpleNamespace + +from bec_widgets.utils.signal_classification import ( + SignalCategory, + classify_device_signal, + classify_signal_info, +) + + +def _info(signal_class=None, role=None, with_embedded=False): + info = {"signal_class": signal_class, "component_name": "comp", "obj_name": "dev_comp"} + if with_embedded: + info["describe"] = {"signal_info": {"role": role or "main", "signals": [["comp", 5]]}} + return info + + +def _device(signals: dict): + return SimpleNamespace(_info={"signals": signals}) + + +def test_sync_signal_classes_are_sync(): + for cls in ("Signal", "EpicsSignal", "EpicsSignalRO", "SetableSignal", "ReadOnlySignal"): + assert classify_signal_info(_info(signal_class=cls)) == SignalCategory.SYNC + + +def test_async_signal_classes_are_async(): + for cls in ("AsyncSignal", "AsyncMultiSignal", "DynamicSignal"): + assert classify_signal_info(_info(signal_class=cls)) == SignalCategory.ASYNC + + +def test_dynamic_signal_subclass_detected_via_embedded_info(): + """A DynamicSignal subclass serializes its concrete class name; the + embedded signal_info block must still classify it as async.""" + info = _info(signal_class="MyBeamlineAsyncSignal", with_embedded=True) + assert classify_signal_info(info) == SignalCategory.ASYNC + + +def test_non_curve_roles_are_unknown(): + for role in ("preview", "diagnostic", "file_event", "progress"): + info = _info(signal_class="PreviewSignal", role=role, with_embedded=True) + assert classify_signal_info(info) == SignalCategory.UNKNOWN + + +def test_missing_or_empty_info_is_unknown(): + assert classify_signal_info(None) == SignalCategory.UNKNOWN + assert classify_signal_info({}) == SignalCategory.UNKNOWN + assert classify_signal_info({"signal_class": None}) == SignalCategory.UNKNOWN + + +def test_monitored_device_with_mixed_signals(): + """One (monitored) device exposing both + a synchronous and an asynchronous signal must classify per signal, not + per device.""" + + device = _device( + { + "readback": _info(signal_class="Signal"), + "stream": _info(signal_class="AsyncSignal"), + "multi": _info(signal_class="AsyncMultiSignal"), + } + ) + assert classify_device_signal(device, "readback") == SignalCategory.SYNC + assert classify_device_signal(device, "stream") == SignalCategory.ASYNC + assert classify_device_signal(device, "multi") == SignalCategory.ASYNC + + +def test_entry_resolved_via_obj_name(): + info = _info(signal_class="AsyncSignal") + info["obj_name"] = "dev1_stream" + device = _device({"stream": info}) + assert classify_device_signal(device, "dev1_stream") == SignalCategory.ASYNC + + +def test_unresolvable_device_or_entry_is_unknown(): + assert classify_device_signal(None, "x") == SignalCategory.UNKNOWN + assert classify_device_signal(SimpleNamespace(), "x") == SignalCategory.UNKNOWN + assert classify_device_signal(_device({}), "missing") == SignalCategory.UNKNOWN + assert classify_device_signal(_device({"a": _info("Signal")}), "") == SignalCategory.UNKNOWN diff --git a/tests/unit_tests/test_waveform.py b/tests/unit_tests/test_waveform.py index 60a2356a..16940070 100644 --- a/tests/unit_tests/test_waveform.py +++ b/tests/unit_tests/test_waveform.py @@ -1642,3 +1642,48 @@ def test_history_curve_scan_not_found_returns_none(qtbot, mocked_client): # No history scans injected for this widget c = wf.plot(device_y="bpm4i", signal_y="bpm4i", scan_id="unknown_scan") assert c is None + + +def test_categorise_device_curves_monitored_device_with_async_signal(qtbot, mocked_client): + """Device listed under 'monitored' + readout priority may expose an asynchronous signal; the curve must be + classified by the signal class, not the parent device's readout priority.""" + wf = create_widget(qtbot, Waveform, client=mocked_client) + dummy_scan = create_dummy_scan_item() + wf.scan_item = dummy_scan + + # bpm4i is listed under 'monitored' in the dummy scan; give it an + # additional async signal, as a mixed device would have. + device = mocked_client.device_manager.devices["bpm4i"] + device.signals["bpm4i_stream"] = {"value": 0.0} + device._info["signals"]["bpm4i_stream"] = { + "kind_str": "hinted", + "component_name": "bpm4i_stream", + "obj_name": "bpm4i_stream", + "signal_class": "AsyncSignal", + } + + c_sync = wf.plot(arg1="bpm4i", label="sync-curve") + c_async = wf.plot(device_y="bpm4i", signal_y="bpm4i_stream", label="async-curve") + + mode = wf._categorise_device_curves() + + assert mode == "mixed" + assert c_sync in wf._sync_curves + assert c_async in wf._async_curves + + +def test_categorise_device_curves_falls_back_to_readout_priority(qtbot, mocked_client): + """When no signal info is available (e.g. history data for a removed + device), classification falls back to the scan's readout-priority lists.""" + wf = create_widget(qtbot, Waveform, client=mocked_client) + dummy_scan = create_dummy_scan_item() + wf.scan_item = dummy_scan + + c_async = wf.plot(arg1="async_device", label="fallback-curve") + # Simulate the device's signal info being unavailable. + mocked_client.device_manager.devices["async_device"]._info = {} + + wf._categorise_device_curves() + + assert c_async in wf._async_curves