From 609010d6825e0b6c46cd7a60abfa30235b393962 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Fri, 28 Nov 2025 23:01:37 +0100 Subject: [PATCH] feat(signal): add signal normalization method and corresponding tests --- ophyd_devices/utils/bec_signals.py | 23 +++++++++++++++++++ tests/test_utils.py | 36 ++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/ophyd_devices/utils/bec_signals.py b/ophyd_devices/utils/bec_signals.py index c4eaaeb..b86aac5 100644 --- a/ophyd_devices/utils/bec_signals.py +++ b/ophyd_devices/utils/bec_signals.py @@ -777,6 +777,7 @@ class DynamicSignal(BECMessageSignal): f"Signal {self.name} not found in message {list(msg.signals.keys())}" ) return + self._normalize_signals(msg) available_signals = [f"{self.name}_{signal_name}" for signal_name, _ in self.signals] if self.strict_signal_validation: if set(msg.signals.keys()) != set(available_signals): @@ -798,6 +799,28 @@ class DynamicSignal(BECMessageSignal): # Add here validation for async update # TODO #629 Issue in BEC: Validate async_update --> bec_lib + def _normalize_signals(self, msg: messages.DeviceMessage) -> None: + """ + Normalize signal names in the message to include the group name as prefix. + For a device 'samx' and signal component 'mysignal' with sub-signals 'a', 'b', 'c', + the expected signal names are either + 'samx_mysignal_a', 'samx_mysignal_b', 'samx_mysignal_c' + or just + 'a', 'b', 'c' + This method normalizes the latter case to the former. + + Args: + msg (DeviceMessage): The device message to normalize. + """ + prefix = f"{self.name}_" + normalized_signals = {} + for signal_name, data in msg.signals.items(): + if signal_name.startswith(prefix): + normalized_signals[signal_name] = data + else: + normalized_signals[f"{prefix}{signal_name}"] = data + msg.signals = normalized_signals + def set( self, value: messages.DeviceMessage | dict[str, dict[Literal["value"], Any]], diff --git a/tests/test_utils.py b/tests/test_utils.py index 91374cd..9bf0be0 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -7,6 +7,7 @@ import numpy as np import ophyd import pytest from bec_lib import messages +from ophyd import Component as Cpt from ophyd import Device, EpicsSignalRO, Signal from ophyd.status import WaitTimeoutError from typeguard import TypeCheckError @@ -275,6 +276,41 @@ def test_utils_bec_message_signal(): signal.put({"wrong_key": "wrong_value"}) +@pytest.mark.parametrize( + "input_msg, output_msg", + [ + ( + messages.DeviceMessage( + signals={"sig1": {"value": 1}, "sig2": {"value": 2}}, metadata={"info": "test"} + ), + messages.DeviceMessage( + signals={"device_data_sig1": {"value": 1}, "device_data_sig2": {"value": 2}}, + metadata={"info": "test"}, + ), + ), + ( + messages.DeviceMessage( + signals={"device_data_sig1": {"value": 1}, "device_data_sig2": {"value": 2}}, + metadata={"info": "test"}, + ), + messages.DeviceMessage( + signals={"device_data_sig1": {"value": 1}, "device_data_sig2": {"value": 2}}, + metadata={"info": "test"}, + ), + ), + ], +) +def test_utils_signal_normalization(input_msg, output_msg): + """Test signal normalization utility in BECMessageSignal""" + + class DeviceWithSignal(Device): + data = Cpt(AsyncMultiSignal, name="data", signals=["sig1", "sig2"], ndim=0, max_size=1000) + + dev = DeviceWithSignal(name="device") + dev.data._normalize_signals(input_msg) + assert input_msg == output_msg + + def test_utils_dynamic_signal(): """Test DynamicSignal""" dev = Device(name="device")