From 88ab2d2e8df1dcaaad102c3b04d8e73998639252 Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 20 Mar 2025 20:26:59 +0100 Subject: [PATCH] feat: add bec_signals with BECMessages to utils --- ophyd_devices/sim/sim_test_devices.py | 132 ++++++- ophyd_devices/utils/bec_signals.py | 491 ++++++++++++++++++++++++++ tests/test_simulation.py | 50 ++- tests/test_utils.py | 286 +++++++++++++++ 4 files changed, 957 insertions(+), 2 deletions(-) create mode 100644 ophyd_devices/utils/bec_signals.py diff --git a/ophyd_devices/sim/sim_test_devices.py b/ophyd_devices/sim/sim_test_devices.py index 88353bd..2c8f0a2 100644 --- a/ophyd_devices/sim/sim_test_devices.py +++ b/ophyd_devices/sim/sim_test_devices.py @@ -7,11 +7,17 @@ from bec_lib import messages from bec_lib.endpoints import MessageEndpoints from bec_lib.logger import bec_logger from ophyd import Component as Cpt -from ophyd import Device, DeviceStatus, OphydObject, PositionerBase, Staged +from ophyd import Device, DeviceStatus, Kind, OphydObject, PositionerBase, Staged from ophyd_devices.sim.sim_camera import SimCamera from ophyd_devices.sim.sim_positioner import SimPositioner from ophyd_devices.sim.sim_signals import SetableSignal +from ophyd_devices.utils.bec_signals import ( + DynamicSignal, + FileEventSignal, + PreviewSignal, + ProgressSignal, +) logger = bec_logger.logger @@ -311,3 +317,127 @@ class SimCameraWithStageStatus(SimCamera): thread = threading.Thread(target=_unstage_device, args=(self, status), daemon=True) thread.start() return status + + +class SimCameraWithPSIComponents(SimCamera): + """Test Device for PSIComponents""" + + preview_2d = Cpt(PreviewSignal, ndim=2, doc="2D preview signal", num_rot90=2, transpose=True) + preview_1d = Cpt(PreviewSignal, ndim=1, doc="1D preview signal") + file_event = Cpt(FileEventSignal, doc="File event signal") + progress = Cpt(ProgressSignal, doc="Progress signal") + dynamic_signal = Cpt( + DynamicSignal, doc="Dynamic signals", signal_names=["preview_2d", "preview_1d"] + ) + # TODO Handling of AsyncComponents is postponed, issue #104 is created + + # Define signals for the async components + # signal_dict = { + # "signal1": {"kind": Kind.hinted, "doc": "Signal 1"}, + # "signal2": {"kind": Kind.normal, "doc": "Signal 2"}, + # "signal3": {"kind": Kind.config, "doc": "Signal 3"}, + # "signal4": {"kind": Kind.omitted, "doc": "Signal 4"}, + # } + + # async_1d = Async1DComponent(doc="Async 1D signal", signal_def=signal_dict) + # async_2d = Async2DComponent(doc="Async 2D signal", signal_def=signal_dict) + + def __init__(self, name: str, scan_info=None, device_manager=None, **kwargs): + super().__init__(name=name, scan_info=scan_info, device_manager=device_manager, **kwargs) + self._triggers_received = 0 + + def on_stage(self): + """Stage device""" + self.file_path = self.file_utils.get_full_path( + scan_status_msg=self.scan_info.msg, name=self.name + ) + self.frames.set( + self.scan_info.msg.num_points * self.scan_info.msg.scan_parameters["frames_per_trigger"] + ).wait() + self.exp_time.set(self.scan_info.msg.scan_parameters["exp_time"]).wait() + self.burst.set(self.scan_info.msg.scan_parameters["frames_per_trigger"]).wait() + # Always emit a file event + msg = messages.FileMessage( + file_path=self.file_path, done=False, successful=False, device_name=self.name + ) + self.file_event.put(file_path=self.file_path, done=False, successful=False) + self.file_event.set(msg).wait() + self._triggers_received = 0 + + def on_trigger(self): + """Trigger device""" + self._triggers_received += 1 + + def trigger_cam(): + for _ in range(self.burst.get()): + data = self.image.get() + self.preview_2d.put(data) + # sum array in one dimension + self.preview_1d.put(np.sum(data, 1)) + self.dynamic_signal.put( + {"preview_2d": {"value": data}, "preview_1d": {"value": np.sum(data, 1)}} + ) + progress = { + "value": self._triggers_received, + "max_value": self.scan_info.msg.num_points, + "done": (self._triggers_received == self.scan_info.msg.num_points), + } + progress = messages.ProgressMessage(**progress) + self.progress.put(progress) + self._set_async_signal() + + status = self.task_handler.submit_task(trigger_cam) + return status + + def _set_async_signal(self, update_all: bool = False): + """Set the async signal values. + + Args: + update_all (bool): If True, update all signals. If False, update only hinted and normal signals. + """ + # pylint: disable=protected-access + for sig in self.async_1d._signals.values(): + if update_all is True: + sig.put(np.random.random()) + continue + if sig.kind in [Kind.hinted, Kind.normal]: + sig.put(np.random.random()) + for sig in self.async_2d._signals.values(): + if update_all is True: + sig.put(np.random.random()) + continue + if sig.kind in [Kind.hinted, Kind.normal]: + sig.put(np.random.random()) + + def on_unstage(self): + """Unstage device""" + self._triggers_received = 0 + + def on_complete(self): + """Complete device""" + + def complete_cam(): + """Complete the camera acquisition.""" + msg = messages.FileMessage( + file_path=self.file_path if self.file_path else "", + done=True, + successful=True, + device_name=self.name, + ) + self.file_event.set(msg).wait() + progress = { + "value": self._triggers_received, + "max_value": self.scan_info.msg.num_points, + "done": True, + } + progress = messages.ProgressMessage(**progress) + self.progress.set(progress).wait() + self._set_async_signal(update_all=True) + + status = self.task_handler.submit_task(complete_cam) + return status + + +if __name__ == "__main__": + cam = SimCameraWithPSIComponents(name="cam") + cam.read() diff --git a/ophyd_devices/utils/bec_signals.py b/ophyd_devices/utils/bec_signals.py new file mode 100644 index 0000000..cacf002 --- /dev/null +++ b/ophyd_devices/utils/bec_signals.py @@ -0,0 +1,491 @@ +""" +Module for custom BEC signals, that wrap around ophyd.Signal. +These signals emit BECMessage objects, which comply with the BEC message system. +""" + +from typing import Any, Callable, Literal, Type + +import numpy as np +from bec_lib import messages +from ophyd import DeviceStatus, Kind, Signal +from pydantic import ValidationError +from typeguard import typechecked + +# pylint: disable=arguments-differ +# pylint: disable=arguments-renamed +# pylint: disable=too-many-arguments +# pylint: disable=signature-differs + + +class BECMessageSignal(Signal): + """ + Custom signal class that accepts BECMessage objects as values. + Dictionaries are also accepted if convertible to the correct BECMessage type. + """ + + def __init__( + self, + name: str, + *, + bec_message_type: Type[messages.BECMessage], + value: messages.BECMessage | dict | None = None, + kind: Kind | str = Kind.omitted, + signal_metadata: dict | None = None, + **kwargs, + ): + """ + Create a new BECMessageSignal object. + + Args: + name (str) : The name of the signal. + bec_message_type: type[BECMessage] : The type of BECMessage to accept as values. + value (BECMessage | dict | None) : The value of the signal. Defaults to None. + kind (Kind | str) : The kind of the signal. Defaults to Kind.omitted. + """ + if isinstance(value, dict): + value = bec_message_type(**value) + if value and not isinstance(value, bec_message_type): + raise ValueError( + f"Value must be a {bec_message_type.__name__} or a dict for signal {name}" + ) + self.signal_metadata = signal_metadata + self._bec_message_type = bec_message_type + kwargs.pop("dtype", None) # Ignore dtype if specified + kwargs.pop("shape", None) # Ignore shape if specified + super().__init__(name=name, value=value, shape=(), dtype=None, kind=kind, **kwargs) + + def describe(self): + out = super().describe() + out[self.name]["signal_metadata"] = self.signal_metadata or {} + return out + + @property + def source_name(self) -> str: + """ + Get the source name of the signal. + + Returns: + str: The source name of the signal. + """ + return f"BECMessageSignal:{self.name}" + + def put(self, value: messages.BECMessage | dict | None = None, **kwargs) -> None: + """ + Put method for BECMessageSignal. + + If value is set to None, BEC's callback will not update REDIS. + + Args: + value (BECMessage | dict | None) : The value to put. + """ + if isinstance(value, dict): + value = self._bec_message_type(**value) + if value and not isinstance(value, self._bec_message_type): + raise ValueError( + f"Value must be a {self._bec_message_type.__name__}" + f" or a dict for signal {self.name}" + ) + return super().put(value, **kwargs) + + def set(self, value: messages.BECMessage | dict | None = None, **kwargs) -> DeviceStatus: + """ + Set method for BECMessageSignal. + + If value is set to None, BEC's callback will not update REDIS. + + Args: + value (BECMessage | dict | None) : The value to put. + """ + self.put(value, **kwargs) + status = DeviceStatus(device=self) + status.set_finished() + return status + + def _infer_value_kind(self, inference_func: Any) -> Any: + return self._bec_message_type.__name__ + + +class ProgressSignal(BECMessageSignal): + """Signal to emit progress updates.""" + + def __init__( + self, *, name: str, value: messages.ProgressMessage | dict | None = None, **kwargs + ): + """ + Create a new ProgressSignal object. + + Args: + name (str) : The name of the signal. + value (ProgressMessage | dict | None) : The initial value of the signal. Defaults to None. + """ + kwargs.pop("kind", None) # Ignore kind if specified + super().__init__( + name=name, + value=value, + bec_message_type=messages.ProgressMessage, + kind=Kind.omitted, + **kwargs, + ) + + def put( + self, + msg: messages.ProgressMessage | dict | None = None, + *, + value: float | None = None, + max_value: float | None = None, + done: bool | None = None, + metadata: dict | None = None, + **kwargs, + ) -> None: + """ + Put method for ProgressSignal. + + If msg is provided, it will be directly set as ProgressMessage. + Dictionaries are accepted and will be converted. + Otherwise, at least value, max_value and done must be provided. + + Args: + msg (ProgressMessage | dict | None) : The progress message. + value (float) : The current progress value. + max_value (float) : The maximum progress value. + done (bool) : Whether the progress is done. + metadata (dict | None) : Additional metadata + """ + # msg is ProgressMessage or dict + if isinstance(msg, (messages.ProgressMessage, dict)): + return super().put(msg, **kwargs) + try: + msg = messages.ProgressMessage( + value=value, max_value=max_value, done=done, metadata=metadata + ) + except ValidationError as exc: + raise ValueError(f"Error setting signal {self.name}: {exc}") from exc + return super().put(msg, **kwargs) + + def set( + self, + msg: messages.ProgressMessage | dict | None = None, + *, + value: float | None = None, + max_value: float | None = None, + done: bool | None = None, + metadata: dict | None = None, + **kwargs, + ) -> DeviceStatus: + """ + Set method for ProgressSignal. + + If msg is provided, it will be directly set as ProgressMessage. + Dictionaries are accepted and will be converted. + Otherwise, at least value, max_value and done must be provided. + + Args: + msg (ProgressMessage | dict | None) : The progress message. + value (float) : The current progress value. + max_value (float) : The maximum progress value. + done (bool) : Whether the progress is done. + metadata (dict | None) : Additional metadata + """ + self.put(msg=msg, value=value, max_value=max_value, done=done, metadata=metadata, **kwargs) + status = DeviceStatus(device=self) + status.set_finished() + return status + + +class FileEventSignal(BECMessageSignal): + """Signal to emit file events.""" + + def __init__(self, *, name: str, value: messages.FileMessage | dict | None = None, **kwargs): + """ + Create a new FileEventSignal object. + + Args: + name (str) : The name of the signal. + value (FileMessage | dict | None) : The initial value of the signal. Defaults to None. + kind (Kind | str) : The kind of the signal. Defaults to Kind.omitted. + """ + kwargs.pop("kind", None) # Ignore kind if specified + super().__init__( + name=name, + value=value, + bec_message_type=messages.FileMessage, + kind=Kind.omitted, + **kwargs, + ) + + def put( + self, + msg: messages.FileMessage | dict | None = None, + *, + file_path: str | None = None, + done: bool | None = None, + successful: bool | None = None, + device_name: str | None = None, + file_type: str = "h5", + hinted_h5_entries: dict[str, str] | None = None, + metadata: dict | None = None, + **kwargs, + ) -> None: + """ + Put method for FileEventSignal. + + If msg is provided, it will be directly set as FileMessage. + Dictionaries are accepted and will be converted. + Otherwise, at least file_path, done and successful must be provided. + + Args: + msg (FileMessage | dict | None) : The file event message. + file_path (str) : The path of the file. + done (bool) : Whether the file event is done. + successful (bool) : Whether the file event was successful. + device_name (str | None) : The name of the device. + file_type (str | None) : The type of the file. + hinted_h5_entries (dict[str, str] | None): The hinted h5 entries. + metadata (dict | None) : Additional metadata. + """ + if isinstance(msg, (messages.FileMessage, dict)): + return super().put(msg, **kwargs) + # kwargs provided + try: + msg = messages.FileMessage( + file_path=file_path, + done=done, + successful=successful, + device_name=device_name, + file_type=file_type, + hinted_h5_entries=hinted_h5_entries, + metadata=metadata, + ) + except ValidationError as exc: + raise ValueError(f"Error setting signal {self.name}: {exc}") from exc + return super().put(msg, **kwargs) + + # pylint: disable=arguments-differ + def set( + self, + msg: messages.FileMessage | dict | None = None, + *, + file_path: str | None = None, + done: bool | None = None, + successful: bool | None = None, + device_name: str | None = None, + file_type: str = "h5", + hinted_h5_entries: dict[str, str] | None = None, + metadata: dict | None = None, + **kwargs, + ) -> DeviceStatus: + """ + Set method for FileEventSignal. + + If msg is provided, it will be directly set as FileMessage. + Dictionaries are accepted and will be converted. + Otherwise, at least file_path, done and successful must be provided. + + Args: + msg (FileMessage | dict | None) : The file event message. + file_path (str) : The path of the file. + done (bool) : Whether the file event is done. + successful (bool) : Whether the file event was successful. + device_name (str | None) : The name of the device. + file_type (str | None) : The type of the file. + hinted_h5_entries (dict[str, str] | None): The hinted h5 entries. + metadata (dict | None) : Additional metadata. + """ + self.put( + msg=msg, + file_path=file_path, + done=done, + successful=successful, + device_name=device_name, + file_type=file_type, + hinted_h5_entries=hinted_h5_entries, + metadata=metadata, + **kwargs, + ) + status = DeviceStatus(device=self) + status.set_finished() + return status + + +class PreviewSignal(BECMessageSignal): + """Signal to emit preview data.""" + + def __init__( + self, + *, + name: str, + ndim: Literal[1, 2], + num_rot90: Literal[0, 1, 2, 3] = 0, + transpose: bool = False, + value: dict | None = None, + **kwargs, + ): + """ + Create a new FileEventSignal object. + + Args: + name (str) : The name of the signal. + ndim (Literal[1, 2]) : The number of dimensions. + value (DeviceMonitorMessage | dict | None) : The initial value of the signal. Defaults to None. + """ + kwargs.pop("kind", None) + super().__init__( + name=name, + value=value, + bec_message_type=messages.DevicePreviewMessage, + kind=Kind.omitted, + signal_metadata={"ndim": ndim, "num_rot90": num_rot90, "transpose": transpose}, + **kwargs, + ) + + # pylint: disable=signature-differs + def put( + self, + value: list | np.ndarray | dict | messages.DevicePreviewMessage, + *, + metadata: dict | None = None, + **kwargs, + ) -> None: + """ + Put method for Preview1DSignal. + + If value is a DeviceMonitor1DMessage, it will be directly set, + if value is a dict, it will be converted to a DeviceMonitor1DMessage. + + Args: + value (list | np.ndarray | dict | self._bec_message_type): The preview data. Must be 1D. + metadata (dict | None): Additional metadata. If dict or self._bec_message_type is passed, it will be ignored. + """ + if isinstance(value, self._bec_message_type): + return super().put(value, **kwargs) + if isinstance(value, dict): + value["device"] = self.parent.name + value["signal"] = self.name + return super().put(value, **kwargs) + device = self.parent.name + if isinstance(value, list): + value = np.array(value) + try: + msg = self._bec_message_type( + data=value, device=device, signal=self.name, metadata=metadata + ) + except ValidationError as exc: + raise ValueError(f"Error setting signal {self.name}: {exc}") from exc + super().put(msg, **kwargs) + + # pylint: disable=signature-differs + def set( + self, value: list | np.ndarray | dict, *, metadata: dict | None = None, **kwargs + ) -> DeviceStatus: + """ + Put method for Preview1DSignal. + + If value is a DeviceMonitor1DMessage, it will be directly set, + if value is a dict, it will be converted to a DeviceMonitor1DMessage. + + Args: + value (list | np.ndarray | dict | self._bec_message_type) : The preview data. Must be 1D. + metadata (dict | None) : Additional metadata. If dict or self._bec_message_type is passed, it will be ignored. + """ + self.put(value=value, metadata=metadata, **kwargs) + status = DeviceStatus(device=self) + status.set_finished() + return status + + +class DynamicSignal(BECMessageSignal): + """Signal to emit dynamic device data.""" + + @typechecked + def __init__( + self, + *, + name: str, + signal_names: list[str] | Callable[[], list[str]], + value: messages.DeviceMessage | dict | None = None, + **kwargs, + ): + """ + Create a new DynamicSignal object. + + Args: + name (str) : The name of the signal. + signal_names (list[str] | Callable) : The names of all signals. Can be a list or a callable. + value (DeviceMessage | dict | None) : The initial value of the signal. Defaults to None. + """ + _raise = False + if callable(signal_names): + self.signal_names = signal_names() + elif isinstance(signal_names, list): + self.signal_names = signal_names + else: + _raise = True + + if _raise is True or len(self.signal_names) == 0: + raise ValueError(f"No names provided for Dynamic signal {name} via {signal_names}") + kwargs.pop("kind", None) # Ignore kind if specified + super().__init__( + name=name, + value=value, + bec_message_type=messages.DeviceMessage, + kind=Kind.omitted, + **kwargs, + ) + + def put( + self, + value: messages.DeviceMessage | dict[str, dict[Literal["value", "timestamp"], Any]], + *, + metadata: dict | None = None, + **kwargs, + ) -> None: + """ + Put method for DynamicSignal. + + All signal names must be defined upon signal creation via signal_names_list. + If value is a DeviceMessage, it will be directly set, + if value is a dict, it will be converted to a DeviceMessage. + + Args: + value (dict | DeviceMessage) : The dynamic device data. + metadata (dict | None) : Additional metadata. + """ + if isinstance(value, messages.DeviceMessage): + self._check_signals(value) + return super().put(value, **kwargs) + try: + msg = messages.DeviceMessage(signals=value, metadata=metadata) + except ValidationError as exc: + raise ValueError(f"Error setting signal {self.name}: {exc}") from exc + self._check_signals(msg) + return super().put(msg, **kwargs) + + def _check_signals(self, msg: messages.DeviceMessage) -> None: + """Check if all signals are valid.""" + if any(name not in self.signal_names for name in msg.signals): + raise ValueError( + f"Invalid signal name in message {list(msg.signals.keys())} for signals {self.signal_names}" + ) + + def set( + self, + value: messages.DeviceMessage | dict[str, dict[Literal["value"], Any]], + *, + metadata: dict | None = None, + **kwargs, + ) -> DeviceStatus: + """ + Set method for DynamicSignal. + + All signal names must be defined upon signal creation via signal_names_list. + If value is a DeviceMessage, it will be directly set, + if value is a dict, it will be converted to a DeviceMessage. + + Args: + value (dict | DeviceMessage) : The dynamic device data. + metadata (dict | None) : Additional metadata. + """ + self.put(value, metadata=metadata, **kwargs) + status = DeviceStatus(device=self) + status.set_finished() + return status diff --git a/tests/test_simulation.py b/tests/test_simulation.py index 1d6390a..07b80f6 100644 --- a/tests/test_simulation.py +++ b/tests/test_simulation.py @@ -4,7 +4,6 @@ import os import threading import time -from types import SimpleNamespace from unittest import mock import h5py @@ -32,11 +31,16 @@ from ophyd_devices.sim.sim_frameworks.stage_camera_proxy import StageCameraProxy from ophyd_devices.sim.sim_monitor import SimMonitor, SimMonitorAsync from ophyd_devices.sim.sim_positioner import SimLinearTrajectoryPositioner, SimPositioner from ophyd_devices.sim.sim_signals import ReadOnlySignal +from ophyd_devices.sim.sim_test_devices import SimCameraWithPSIComponents from ophyd_devices.sim.sim_utils import H5Writer, LinearTrajectory from ophyd_devices.sim.sim_waveform import SimWaveform from ophyd_devices.tests.utils import get_mock_scan_info from ophyd_devices.utils.bec_device_base import BECDevice, BECDeviceBase +# pylint: disable=protected-access +# pylint: disable=no-member +# pylint: disable=too-many-arguments + @pytest.fixture(scope="function") def waveform(name="waveform"): @@ -832,3 +836,47 @@ def test_waveform_send_async_update(waveform, mode, index, expected_md): args, kwargs = mock_xadd.call_args msg = args[1]["data"] assert msg.metadata == expected_md + + +##################################### +### Test PSiComponent test device ### +##################################### + + +@pytest.fixture +def test_device(): + dev = SimCameraWithPSIComponents(name="test_device") + yield dev + + +def test_simulation_sim_camera_with_psi_component(test_device): + """Test the simulation test device with PSI components.""" + assert test_device.name == "test_device" + assert all( + element in test_device._signals + for element in [ + "preview_2d", + "preview_1d", + "file_event", + "progress", + "dynamic_signal", + # "async_1d", + # "async_2d", + ] + ) + # No signals are shown when read is called on the device + assert test_device.read() == {} + ### Commented out because the async signals are not implemented yet, cf. issue #104 + + # Hinted and normal signals + # assert list(test_device.async_1d.read().keys()) == [ + # "test_device_async_1d_signal1", + # "test_device_async_1d_signal2", + # ] + # assert list(test_device.async_2d.read().keys()) == [ + # "test_device_async_2d_signal1", + # "test_device_async_2d_signal2", + # ] + # # Config signals + # assert "test_device_async_1d_signal3" in test_device.async_1d.read_configuration() + # assert "test_device_async_2d_signal3" in test_device.async_2d.read_configuration() diff --git a/tests/test_utils.py b/tests/test_utils.py index 55e8519..9c5173e 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,9 +1,18 @@ import threading import time +import numpy as np import pytest +from bec_lib import messages from ophyd import Device +from ophyd_devices.utils.bec_signals import ( + BECMessageSignal, + DynamicSignal, + FileEventSignal, + PreviewSignal, + ProgressSignal, +) from ophyd_devices.utils.psi_device_base_utils import ( FileHandler, TaskHandler, @@ -12,6 +21,13 @@ from ophyd_devices.utils.psi_device_base_utils import ( TaskStatus, ) +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name + +########################################## +######### Test Task Handler ############ +########################################## + @pytest.fixture def file_handler(): @@ -180,3 +196,273 @@ def test_utils_task_handler_shutdown(task_handler): assert status1.state == TaskState.KILLED assert status2.state == TaskState.KILLED assert status1.exception().__class__ == TaskKilledError + + +########################################## +######### Test PSI cusomt signals ###### +########################################## + + +def test_utils_bec_message_signal(): + """Test BECMessageSignal""" + dev = Device(name="device") + signal = BECMessageSignal( + name="bec_message_signal", + bec_message_type=messages.GUIInstructionMessage, + value=None, + parent=dev, + ) + assert signal.parent == dev + assert signal._bec_message_type == messages.GUIInstructionMessage + assert signal._readback is None + assert signal.name == "bec_message_signal" + assert signal.describe() == { + "bec_message_signal": { + "source": "BECMessageSignal:bec_message_signal", + "dtype": "GUIInstructionMessage", + "shape": [], + "signal_metadata": {}, + } + } + # Put works with Message + msg = messages.GUIInstructionMessage(action="image", parameter={"gui_id": "test"}) + signal.put(msg) + reading = signal.read() + assert reading[signal.name]["value"] == msg + # set works with dict, should call put + msg_dict = {"action": "image", "parameter": {"gui_id": "test"}} + status = signal.set(msg_dict) + assert status.done is True + reading = signal.read() + assert reading[signal.name]["value"] == msg + # Put fails with wrong type + with pytest.raises(ValueError): + signal.put("wrong_type") + # Put fails with wrong dict + with pytest.raises(ValueError): + signal.put({"wrong_key": "wrong_value"}) + + +def test_utils_dynamic_signal(): + """Test DynamicSignal""" + dev = Device(name="device") + signal = DynamicSignal( + name="dynamic_signal", signal_names=["sig1", "sig2"], value=None, parent=dev + ) + assert signal.parent == dev + assert signal._bec_message_type == messages.DeviceMessage + assert signal._readback is None + assert signal.name == "dynamic_signal" + assert signal.signal_names == ["sig1", "sig2"] + assert signal.describe() == { + "dynamic_signal": { + "source": "BECMessageSignal:dynamic_signal", + "dtype": "DeviceMessage", + "shape": [], + "signal_metadata": {}, + } + } + + # Put works with Message + msg_dict = {"sig1": {"value": 1}, "sig2": {"value": 2}} + msg = messages.DeviceMessage(signals=msg_dict) + signal.put(msg) + reading = signal.read() + assert reading[signal.name]["value"] == msg + # Set works with dict + status = signal.set(msg_dict) + assert status.done is True + reading = signal.read() + assert reading[signal.name]["value"] == msg + # Put fails with wrong type + with pytest.raises(ValueError): + signal.put("wrong_type") + # Put fails with wrong dict + with pytest.raises(ValueError): + signal.put({"wrong_key": "wrong_value"}) + + +def test_utils_file_event_signal(): + """Test FileEventSignal""" + dev = Device(name="device") + signal = FileEventSignal(name="file_event_signal", value=None, parent=dev) + assert signal.parent == dev + assert signal._bec_message_type == messages.FileMessage + assert signal._readback is None + assert signal.name == "file_event_signal" + assert signal.describe() == { + "file_event_signal": { + "source": "BECMessageSignal:file_event_signal", + "dtype": "FileMessage", + "shape": [], + "signal_metadata": {}, + } + } + # Test put works with FileMessage + msg_dict = {"file_path": "/path/to/another/file.txt", "done": False, "successful": True} + msg = messages.FileMessage(**msg_dict) + signal.put(msg) + reading = signal.read() + assert reading[signal.name]["value"] == msg + # Test put works with dict + signal.put(msg_dict) + reading = signal.read() + assert reading[signal.name]["value"] == msg + # Test set with kwargs, should call put + status = signal.set(file_path="/path/to/another/file.txt", done=False, successful=True) + assert status.done is True + reading = signal.read() + assert reading[signal.name]["value"] == msg + # Test put fails with wrong type + with pytest.raises(ValueError): + signal.put(1) + # Test put fails with wrong dict + with pytest.raises(ValueError): + signal.put({"wrong_key": "wrong_value"}) + + +def test_utils_preview_1d_signal(): + """Test Preview1DSignal""" + dev = Device(name="device") + signal = PreviewSignal(name="preview_1d_signal", ndim=1, value=None, parent=dev) + assert signal.signal_metadata.get("ndim") == 1 + assert signal.parent == dev + assert signal._bec_message_type == messages.DevicePreviewMessage + assert signal._readback is None + assert signal.name == "preview_1d_signal" + assert signal.describe() == { + "preview_1d_signal": { + "source": "BECMessageSignal:preview_1d_signal", + "dtype": "DevicePreviewMessage", + "shape": [], + "signal_metadata": {"ndim": 1, "num_rot90": 0, "transpose": False}, + } + } + # Put works with Message + msg_dict = {"device": dev.name, "data": np.array([1, 2, 3]), "signal": "preview_1d_signal"} + msg = messages.DevicePreviewMessage(**msg_dict) + signal.put(msg) + reading = signal.read() + assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump( + exclude="timestamp" + ) + # Put works with dict + signal.put(msg_dict) + reading = signal.read() + assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump( + exclude="timestamp" + ) + # Put works with value + status = signal.set(msg_dict["data"]) + assert status.done is True + reading = signal.read() + assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump( + exclude="timestamp" + ) + # Put works with value + signal.put(msg_dict["data"]) + reading = signal.read() + assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump( + exclude="timestamp" + ) + # Put fails with wrong type + with pytest.raises(ValueError): + signal.put(1) + # Put fails with wrong dict + with pytest.raises(ValueError): + signal.put({"wrong_key": "wrong_value"}) + + +def test_utils_preview_2d_signal(): + """Test Preview2DSignal""" + dev = Device(name="device") + signal = PreviewSignal(name="preview_2d_signal", ndim=2, value=None, parent=dev) + assert signal.signal_metadata.get("ndim") == 2 + assert signal.parent == dev + assert signal._bec_message_type == messages.DevicePreviewMessage + assert signal._readback is None + assert signal.name == "preview_2d_signal" + assert signal.describe() == { + "preview_2d_signal": { + "source": "BECMessageSignal:preview_2d_signal", + "dtype": "DevicePreviewMessage", + "shape": [], + "signal_metadata": {"ndim": 2, "num_rot90": 0, "transpose": False}, + } + } + # Put works with Message + msg_dict = { + "device": dev.name, + "data": np.array([[1, 2, 3], [4, 5, 6]]), + "signal": "preview_2d_signal", + } + msg = messages.DevicePreviewMessage(**msg_dict) + signal.put(msg) + reading = signal.read() + assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump( + exclude="timestamp" + ) + # Put works with dict + signal.put(msg_dict) + reading = signal.read() + assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump( + exclude="timestamp" + ) + # Put works with value + status = signal.set(msg_dict["data"]) + assert status.done is True + reading = signal.read() + assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump( + exclude="timestamp" + ) + # Put works with value + signal.put(msg_dict["data"]) + reading = signal.read() + assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump( + exclude="timestamp" + ) + # Put fails with wrong type + with pytest.raises(ValueError): + signal.put(1) + # Put fails with wrong dict + with pytest.raises(ValueError): + signal.put({"wrong_key": "wrong_value"}) + + +def test_utils_progress_signal(): + """Test ProgressSignal""" + dev = Device(name="device") + signal = ProgressSignal(name="progress_signal", value=None, parent=dev) + assert signal.parent == dev + assert signal._bec_message_type == messages.ProgressMessage + assert signal._readback is None + assert signal.name == "progress_signal" + assert signal.describe() == { + "progress_signal": { + "source": "BECMessageSignal:progress_signal", + "dtype": "ProgressMessage", + "shape": [], + "signal_metadata": {}, + } + } + # Put works with Message + msg = messages.ProgressMessage(value=1, max_value=10, done=False) + signal.put(msg) + reading = signal.read() + assert reading[signal.name]["value"] == msg + # Put works with dict + msg_dict = {"value": 1, "max_value": 10, "done": False} + signal.put(msg_dict) + reading = signal.read() + assert reading[signal.name]["value"] == msg + # Works with kwargs + status = signal.set(value=1, max_value=10, done=False) + assert status.done is True + reading = signal.read() + assert reading[signal.name]["value"] == msg + # Put fails with wrong type + with pytest.raises(ValueError): + signal.put(1) + # Put fails with wrong dict + with pytest.raises(ValueError): + signal.put({"wrong_key": "wrong_value"})