feat(signal): add signal normalization method and corresponding tests

This commit is contained in:
2025-11-28 23:01:37 +01:00
committed by Christian Appel
parent c6537cd39a
commit 609010d682
2 changed files with 59 additions and 0 deletions

View File

@@ -777,6 +777,7 @@ class DynamicSignal(BECMessageSignal):
f"Signal {self.name} not found in message {list(msg.signals.keys())}"
)
return
self._normalize_signals(msg)
available_signals = [f"{self.name}_{signal_name}" for signal_name, _ in self.signals]
if self.strict_signal_validation:
if set(msg.signals.keys()) != set(available_signals):
@@ -798,6 +799,28 @@ class DynamicSignal(BECMessageSignal):
# Add here validation for async update
# TODO #629 Issue in BEC: Validate async_update --> bec_lib
def _normalize_signals(self, msg: messages.DeviceMessage) -> None:
"""
Normalize signal names in the message to include the group name as prefix.
For a device 'samx' and signal component 'mysignal' with sub-signals 'a', 'b', 'c',
the expected signal names are either
'samx_mysignal_a', 'samx_mysignal_b', 'samx_mysignal_c'
or just
'a', 'b', 'c'
This method normalizes the latter case to the former.
Args:
msg (DeviceMessage): The device message to normalize.
"""
prefix = f"{self.name}_"
normalized_signals = {}
for signal_name, data in msg.signals.items():
if signal_name.startswith(prefix):
normalized_signals[signal_name] = data
else:
normalized_signals[f"{prefix}{signal_name}"] = data
msg.signals = normalized_signals
def set(
self,
value: messages.DeviceMessage | dict[str, dict[Literal["value"], Any]],

View File

@@ -7,6 +7,7 @@ import numpy as np
import ophyd
import pytest
from bec_lib import messages
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignalRO, Signal
from ophyd.status import WaitTimeoutError
from typeguard import TypeCheckError
@@ -275,6 +276,41 @@ def test_utils_bec_message_signal():
signal.put({"wrong_key": "wrong_value"})
@pytest.mark.parametrize(
"input_msg, output_msg",
[
(
messages.DeviceMessage(
signals={"sig1": {"value": 1}, "sig2": {"value": 2}}, metadata={"info": "test"}
),
messages.DeviceMessage(
signals={"device_data_sig1": {"value": 1}, "device_data_sig2": {"value": 2}},
metadata={"info": "test"},
),
),
(
messages.DeviceMessage(
signals={"device_data_sig1": {"value": 1}, "device_data_sig2": {"value": 2}},
metadata={"info": "test"},
),
messages.DeviceMessage(
signals={"device_data_sig1": {"value": 1}, "device_data_sig2": {"value": 2}},
metadata={"info": "test"},
),
),
],
)
def test_utils_signal_normalization(input_msg, output_msg):
"""Test signal normalization utility in BECMessageSignal"""
class DeviceWithSignal(Device):
data = Cpt(AsyncMultiSignal, name="data", signals=["sig1", "sig2"], ndim=0, max_size=1000)
dev = DeviceWithSignal(name="device")
dev.data._normalize_signals(input_msg)
assert input_msg == output_msg
def test_utils_dynamic_signal():
"""Test DynamicSignal"""
dev = Device(name="device")