fix: unified device message signature

This commit is contained in:
wakonig_k 2024-04-30 13:49:18 +02:00
parent edb1956e2a
commit c54dfc166f
10 changed files with 75 additions and 69 deletions

View File

@ -791,8 +791,8 @@ class AdjustableMixin:
if not limit_msg: if not limit_msg:
return [0, 0] return [0, 0]
limits = [ limits = [
limit_msg.content["signals"].get("low", 0), limit_msg.content["signals"].get("low", {}).get("value", 0),
limit_msg.content["signals"].get("high", 0), limit_msg.content["signals"].get("high", {}).get("value", 0),
] ]
return limits return limits

View File

@ -301,7 +301,7 @@ class DeviceMessage(BECMessage):
"""Message type for sending device readings from the device server """Message type for sending device readings from the device server
Args: Args:
signals (dict): Dictionary of device signals signals (dict): Dictionary containing the device signals and their values
metadata (dict, optional): Metadata to describe the conditions of the device reading metadata (dict, optional): Metadata to describe the conditions of the device reading
Examples: Examples:
@ -309,7 +309,7 @@ class DeviceMessage(BECMessage):
""" """
msg_type: ClassVar[str] = "device_message" msg_type: ClassVar[str] = "device_message"
signals: dict = Field(default_factory=dict) signals: dict[str, dict[Literal["value", "timestamp"], Any]]
class DeviceRPCMessage(BECMessage): class DeviceRPCMessage(BECMessage):

View File

@ -9,7 +9,9 @@ from bec_lib.redis_connector import MessageObject
def test_channel_monitor_callback(): def test_channel_monitor_callback():
with mock.patch("builtins.print") as mock_print: with mock.patch("builtins.print") as mock_print:
msg = messages.DeviceMessage(signals={"x": 1, "y": 2, "z": 3}, metadata={"name": "test"}) msg = messages.DeviceMessage(
signals={"x": {"value": 1}, "y": {"value": 2}}, metadata={"name": "test"}
)
msg_obj = MessageObject("test", msg) msg_obj = MessageObject("test", msg)
channel_callback(msg_obj) channel_callback(msg_obj)
mock_print.assert_called_once() mock_print.assert_called_once()

View File

@ -600,7 +600,7 @@ def test_adjustable_mixin_limits():
adj = AdjustableMixin() adj = AdjustableMixin()
adj.root = mock.MagicMock() adj.root = mock.MagicMock()
adj.root.parent.connector.get.return_value = messages.DeviceMessage( adj.root.parent.connector.get.return_value = messages.DeviceMessage(
signals={"low": -12, "high": 12}, metadata={} signals={"low": {"value": -12}, "high": {"value": 12}}, metadata={}
) )
assert adj.limits == [-12, 12] assert adj.limits == [-12, 12]
@ -624,7 +624,7 @@ def test_adjustable_mixin_set_low_limit():
adj.update_config = mock.MagicMock() adj.update_config = mock.MagicMock()
adj.root = mock.MagicMock() adj.root = mock.MagicMock()
adj.root.parent.connector.get.return_value = messages.DeviceMessage( adj.root.parent.connector.get.return_value = messages.DeviceMessage(
signals={"low": -12, "high": 12}, metadata={} signals={"low": {"value": -12}, "high": {"value": 12}}, metadata={}
) )
adj.low_limit = -20 adj.low_limit = -20
adj.update_config.assert_called_once_with({"deviceConfig": {"limits": [-20, 12]}}) adj.update_config.assert_called_once_with({"deviceConfig": {"limits": [-20, 12]}})
@ -635,7 +635,7 @@ def test_adjustable_mixin_set_high_limit():
adj.update_config = mock.MagicMock() adj.update_config = mock.MagicMock()
adj.root = mock.MagicMock() adj.root = mock.MagicMock()
adj.root.parent.connector.get.return_value = messages.DeviceMessage( adj.root.parent.connector.get.return_value = messages.DeviceMessage(
signals={"low": -12, "high": 12}, metadata={} signals={"low": {"value": -12}, "high": {"value": 12}}, metadata={}
) )
adj.high_limit = 20 adj.high_limit = 20
adj.update_config.assert_called_once_with({"deviceConfig": {"limits": [-12, 20]}}) adj.update_config.assert_called_once_with({"deviceConfig": {"limits": [-12, 20]}})

View File

@ -88,8 +88,8 @@ class ConfigUpdateHandler:
if "limits" in dev_config["deviceConfig"]: if "limits" in dev_config["deviceConfig"]:
limits = { limits = {
"low": device.obj.low_limit_travel.get(), "low": {"value": device.obj.low_limit_travel.get()},
"high": device.obj.high_limit_travel.get(), "high": {"value": device.obj.high_limit_travel.get()},
} }
self.device_manager.connector.set_and_publish( self.device_manager.connector.set_and_publish(
MessageEndpoints.device_limits(device.name), MessageEndpoints.device_limits(device.name),

View File

@ -62,8 +62,8 @@ class DSDevice(DeviceBase):
dev_config_msg = messages.DeviceMessage(signals=self.obj.read_configuration(), metadata={}) dev_config_msg = messages.DeviceMessage(signals=self.obj.read_configuration(), metadata={})
if hasattr(self.obj, "low_limit_travel") and hasattr(self.obj, "high_limit_travel"): if hasattr(self.obj, "low_limit_travel") and hasattr(self.obj, "high_limit_travel"):
limits = { limits = {
"low": self.obj.low_limit_travel.get(), "low": {"value": self.obj.low_limit_travel.get()},
"high": self.obj.high_limit_travel.get(), "high": {"value": self.obj.high_limit_travel.get()},
} }
else: else:
limits = None limits = None
@ -531,9 +531,7 @@ class DeviceManagerDS(DeviceManagerBase):
for key, val in data.items(): for key, val in data.items():
signals[key] = {"value": val[ii], "timestamp": timestamp} signals[key] = {"value": val[ii], "timestamp": timestamp}
bundle.append( bundle.append(
messages.DeviceMessage( messages.DeviceMessage(signals=signals, metadata={"point_id": ii, **metadata})
signals={obj.name: signals}, metadata={"point_id": ii, **metadata}
)
) )
ds_obj.emitted_points[metadata["scan_id"]] = max_points ds_obj.emitted_points[metadata["scan_id"]] = max_points
pipe = self.connector.pipeline() pipe = self.connector.pipeline()

View File

@ -398,7 +398,7 @@ class ScanStubs:
yield self._device_msg( yield self._device_msg(
device=device, device=device,
action="publish_data_as_read", action="publish_data_as_read",
parameter={"data": {device: data}}, parameter={"data": data},
metadata=metadata, metadata=metadata,
) )

View File

@ -1,16 +1,13 @@
# pylint: skip-file # pylint: skip-file
import contextlib
import os import os
from unittest import mock from unittest import mock
import numpy as np import numpy as np
import pytest import pytest
import yaml
import bec_lib import bec_lib
from bec_lib import DeviceManagerBase, MessageEndpoints, ServiceConfig, messages from bec_lib import MessageEndpoints, ServiceConfig, messages
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from bec_lib.messages import BECStatus
from bec_lib.redis_connector import MessageObject from bec_lib.redis_connector import MessageObject
from bec_lib.tests.utils import ConnectorMock from bec_lib.tests.utils import ConnectorMock
from bec_server.file_writer import FileWriterManager from bec_server.file_writer import FileWriterManager
@ -33,9 +30,10 @@ def file_writer_manager_mock():
"log_writer": {"base_path": "./"}, "log_writer": {"base_path": "./"},
}, },
) )
with mock.patch.object( with (
FileWriterManager, "_start_device_manager", return_value=None mock.patch.object(FileWriterManager, "_start_device_manager", return_value=None),
), mock.patch.object(FileWriterManager, "wait_for_service"): mock.patch.object(FileWriterManager, "wait_for_service"),
):
file_writer_manager_mock = FileWriterManager(config=config, connector_cls=connector_cls) file_writer_manager_mock = FileWriterManager(config=config, connector_cls=connector_cls)
try: try:
yield file_writer_manager_mock yield file_writer_manager_mock
@ -195,10 +193,10 @@ def test_update_async_data(file_writer_manager_mock):
def test_process_async_data_single_entry(file_writer_manager_mock): def test_process_async_data_single_entry(file_writer_manager_mock):
file_manager = file_writer_manager_mock file_manager = file_writer_manager_mock
file_manager.scan_storage["scan_id"] = ScanStorage(10, "scan_id") file_manager.scan_storage["scan_id"] = ScanStorage(10, "scan_id")
data = [{"data": messages.DeviceMessage(signals={"data": np.zeros((10, 10))})}] data = [{"data": messages.DeviceMessage(signals={"data": {"value": np.zeros((10, 10))}})}]
file_manager._process_async_data(data, "scan_id", "dev1") file_manager._process_async_data(data, "scan_id", "dev1")
assert np.isclose( assert np.isclose(
file_manager.scan_storage["scan_id"].async_data["dev1"]["data"], np.zeros((10, 10)) file_manager.scan_storage["scan_id"].async_data["dev1"]["data"]["value"], np.zeros((10, 10))
).all() ).all()

View File

@ -9,10 +9,24 @@ from bec_lib.connector import MessageObject
# pylint: disable=protected-access # pylint: disable=protected-access
def test_device_read_callback(scan_bundler_mock): @pytest.fixture()
def dummy_signal_data():
return {
"samx": {"value": 0.51, "timestamp": 1234.56},
"samx_setpoint": {"value": 0.51, "timestamp": 1234.56},
}
@pytest.fixture()
def dummy_device_data_message(dummy_signal_data):
return messages.DeviceMessage(
signals=dummy_signal_data, metadata={"scan_id": "scan_id", "readout_priority": "monitored"}
)
def test_device_read_callback(scan_bundler_mock, dummy_signal_data):
dev_msg = messages.DeviceMessage( dev_msg = messages.DeviceMessage(
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}}, signals=dummy_signal_data, metadata={"scan_id": "laksjd", "readout_priority": "monitored"}
metadata={"scan_id": "laksjd", "readout_priority": "monitored"},
) )
msg = MessageObject(MessageEndpoints.device_read("samx").endpoint, dev_msg) msg = MessageObject(MessageEndpoints.device_read("samx").endpoint, dev_msg)
@ -68,10 +82,9 @@ def test_wait_for_scan_id(scan_bundler_mock, scan_id, storageID, scan_msg):
sb._wait_for_scan_id(scan_id) sb._wait_for_scan_id(scan_id)
def test_add_device_to_storage_returns_without_scan_id(scan_bundler_mock): def test_add_device_to_storage_returns_without_scan_id(scan_bundler_mock, dummy_signal_data):
msg = messages.DeviceMessage( msg = messages.DeviceMessage(
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}}, signals=dummy_signal_data, metadata={"readout_priority": "monitored"}
metadata={"readout_priority": "monitored"},
) )
sb = scan_bundler_mock sb = scan_bundler_mock
sb._add_device_to_storage([msg], "samx", timeout_time=1) sb._add_device_to_storage([msg], "samx", timeout_time=1)
@ -87,10 +100,9 @@ def test_add_device_to_storage_returns_without_signal(scan_bundler_mock):
assert "samx" not in sb.device_storage assert "samx" not in sb.device_storage
def test_add_device_to_storage_returns_on_timeout(scan_bundler_mock): def test_add_device_to_storage_returns_on_timeout(scan_bundler_mock, dummy_signal_data):
msg = messages.DeviceMessage( msg = messages.DeviceMessage(
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}}, signals=dummy_signal_data, metadata={"scan_id": "scan_id", "readout_priority": "monitored"}
metadata={"scan_id": "scan_id", "readout_priority": "monitored"},
) )
sb = scan_bundler_mock sb = scan_bundler_mock
sb._add_device_to_storage([msg], "samx", timeout_time=1) sb._add_device_to_storage([msg], "samx", timeout_time=1)
@ -98,10 +110,11 @@ def test_add_device_to_storage_returns_on_timeout(scan_bundler_mock):
@pytest.mark.parametrize("scan_status", ["aborted", "closed"]) @pytest.mark.parametrize("scan_status", ["aborted", "closed"])
def test_add_device_to_storage_returns_without_scan_info(scan_bundler_mock, scan_status): def test_add_device_to_storage_returns_without_scan_info(
scan_bundler_mock, scan_status, dummy_signal_data
):
msg = messages.DeviceMessage( msg = messages.DeviceMessage(
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}}, signals=dummy_signal_data, metadata={"scan_id": "scan_id", "readout_priority": "monitored"}
metadata={"scan_id": "scan_id", "readout_priority": "monitored"},
) )
sb = scan_bundler_mock sb = scan_bundler_mock
sb.sync_storage["scan_id"] = {"info": {}} sb.sync_storage["scan_id"] = {"info": {}}
@ -113,30 +126,13 @@ def test_add_device_to_storage_returns_without_scan_info(scan_bundler_mock, scan
@pytest.mark.parametrize( @pytest.mark.parametrize(
"msg,scan_type", "msg,scan_type",
[ [
( ("dummy_device_data_message", "step"),
messages.DeviceMessage( ("dummy_device_data_message", "fly"),
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}}, ("dummy_device_data_message", "wrong"),
metadata={"scan_id": "scan_id", "readout_priority": "monitored"},
),
"step",
),
(
messages.DeviceMessage(
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}},
metadata={"scan_id": "scan_id", "readout_priority": "monitored"},
),
"fly",
),
(
messages.DeviceMessage(
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}},
metadata={"scan_id": "scan_id", "readout_priority": "monitored"},
),
"wrong",
),
], ],
) )
def test_add_device_to_storage_primary(scan_bundler_mock, msg, scan_type): def test_add_device_to_storage_primary(scan_bundler_mock, msg, scan_type, request):
msg = request.getfixturevalue(msg)
sb = scan_bundler_mock sb = scan_bundler_mock
sb.sync_storage["scan_id"] = {"info": {"scan_type": scan_type, "monitor_sync": "bec"}} sb.sync_storage["scan_id"] = {"info": {"scan_type": scan_type, "monitor_sync": "bec"}}
sb.sync_storage["scan_id"]["status"] = "open" sb.sync_storage["scan_id"]["status"] = "open"
@ -165,7 +161,11 @@ def test_add_device_to_storage_primary(scan_bundler_mock, msg, scan_type):
[ [
( (
messages.DeviceMessage( messages.DeviceMessage(
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}}, signals={
"samx": {"value": 0.51},
"setpoint": {"value": 0.5},
"motor_is_moving": {"value": 0},
},
metadata={"scan_id": "scan_id"}, metadata={"scan_id": "scan_id"},
), ),
"fly", "fly",
@ -173,7 +173,9 @@ def test_add_device_to_storage_primary(scan_bundler_mock, msg, scan_type):
( (
messages.DeviceMessage( messages.DeviceMessage(
signals={ signals={
"flyer": {"flyer": 0.51, "flyer_setpoint": 0.5, "flyer_motor_is_moving": 0} "flyer": {"value": 0.51},
"flyer_setpoint": {"value": 0.5},
"flyer_motor_is_moving": {"value": 0},
}, },
metadata={"scan_id": "scan_id"}, metadata={"scan_id": "scan_id"},
), ),
@ -204,7 +206,11 @@ def test_add_device_to_storage_primary_flyer(scan_bundler_mock, msg, scan_type):
[ [
( (
messages.DeviceMessage( messages.DeviceMessage(
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}}, signals={
"samx": {"value": 0.51},
"setpoint": {"value": 0.5},
"motor_is_moving": {"value": 0},
},
metadata={"scan_id": "scan_id", "readout_priority": "baseline"}, metadata={"scan_id": "scan_id", "readout_priority": "baseline"},
), ),
"step", "step",
@ -402,7 +408,6 @@ def test_status_modification(scan_bundler_mock):
scan_id="6ff7a89a-79e5-43ad-828b-c1e1aeed5803", scan_id="6ff7a89a-79e5-43ad-828b-c1e1aeed5803",
status="closed", status="closed",
info={ info={
"readout_priority": "monitored",
"DIID": 4, "DIID": 4,
"RID": "a53538b4-79f3-4132-91b5-d044e438f460", "RID": "a53538b4-79f3-4132-91b5-d044e438f460",
"scan_id": "3ea07f69-b0ee-44fa-8451-b85824a37397", "scan_id": "3ea07f69-b0ee-44fa-8451-b85824a37397",
@ -421,7 +426,6 @@ def test_status_modification(scan_bundler_mock):
scan_id="6ff7a89a-79e5-43ad-828b-c1e1aeed5803", scan_id="6ff7a89a-79e5-43ad-828b-c1e1aeed5803",
status="open", status="open",
info={ info={
"readout_priority": "monitored",
"DIID": 4, "DIID": 4,
"RID": "a53538b4-79f3-4132-91b5-d044e438f460", "RID": "a53538b4-79f3-4132-91b5-d044e438f460",
"scan_id": "3ea07f69-b0ee-44fa-8451-b85824a37397", "scan_id": "3ea07f69-b0ee-44fa-8451-b85824a37397",
@ -476,7 +480,7 @@ def test_initialize_scan_container(scan_bundler_mock, scan_msg):
[ [
[ [
messages.DeviceMessage( messages.DeviceMessage(
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}}, signals={"samx": {"value": 0.51, "timestamp": 1234.56}},
metadata={ metadata={
"scan_id": "adlk-jalskdja", "scan_id": "adlk-jalskdja",
"readout_priority": "monitored", "readout_priority": "monitored",
@ -488,7 +492,7 @@ def test_initialize_scan_container(scan_bundler_mock, scan_msg):
], ],
[ [
messages.DeviceMessage( messages.DeviceMessage(
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}}, signals={"samx": {"value": 0.51, "timestamp": 1234.56}},
metadata={ metadata={
"scan_id": "adlk-jalskdjb", "scan_id": "adlk-jalskdjb",
"readout_priority": "monitored", "readout_priority": "monitored",
@ -500,7 +504,7 @@ def test_initialize_scan_container(scan_bundler_mock, scan_msg):
], ],
[ [
messages.DeviceMessage( messages.DeviceMessage(
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}}, signals={"samx": {"value": 0.51, "timestamp": 1234.56}},
metadata={"scan_id": "adlk-jalskdjc", "readout_priority": "monitored"}, metadata={"scan_id": "adlk-jalskdjc", "readout_priority": "monitored"},
), ),
23, 23,
@ -652,7 +656,11 @@ def test_update_monitor_signals(scan_bundler_mock):
def test_get_last_device_readback(scan_bundler_mock): def test_get_last_device_readback(scan_bundler_mock):
sb = scan_bundler_mock sb = scan_bundler_mock
dev_msg = messages.DeviceMessage( dev_msg = messages.DeviceMessage(
signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}}, signals={
"samx": {"value": 0.51},
"setpoint": {"value": 0.5},
"motor_is_moving": {"value": 0},
},
metadata={"scan_id": "laksjd", "readout_priority": "monitored"}, metadata={"scan_id": "laksjd", "readout_priority": "monitored"},
) )
with mock.patch.object(sb, "connector") as connector_mock: with mock.patch.object(sb, "connector") as connector_mock:

View File

@ -1953,7 +1953,7 @@ def test_monitor_scan_run():
messages.DeviceInstructionMessage( messages.DeviceInstructionMessage(
device="samx", device="samx",
action="publish_data_as_read", action="publish_data_as_read",
parameter={"data": {"samx": {"rb1": {"value": 1}}}}, parameter={"data": {"rb1": {"value": 1}}},
metadata={"readout_priority": "monitored", "DIID": 11, "point_id": 0}, metadata={"readout_priority": "monitored", "DIID": 11, "point_id": 0},
), ),
messages.DeviceInstructionMessage( messages.DeviceInstructionMessage(