test: add tests for new device

This commit is contained in:
2024-05-29 18:42:49 +02:00
parent 1aece61a3b
commit c5544226be

View File

@ -2,11 +2,14 @@
# pylint: disable: all # pylint: disable: all
import os import os
from types import SimpleNamespace
from unittest import mock from unittest import mock
import h5py import h5py
import numpy as np import numpy as np
import pytest import pytest
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_server.device_server.tests.utils import DMMock from bec_server.device_server.tests.utils import DMMock
from ophyd import Device, Signal from ophyd import Device, Signal
@ -19,6 +22,7 @@ from ophyd_devices.interfaces.protocols.bec_protocols import (
) )
from ophyd_devices.sim.sim import SimCamera, SimFlyer, SimMonitor, SimPositioner from ophyd_devices.sim.sim import SimCamera, SimFlyer, SimMonitor, SimPositioner
from ophyd_devices.sim.sim_frameworks import H5ImageReplayProxy, SlitProxy from ophyd_devices.sim.sim_frameworks import H5ImageReplayProxy, SlitProxy
from ophyd_devices.sim.sim_monitor_async import SimMonitorAsync
from ophyd_devices.sim.sim_signals import ReadOnlySignal from ophyd_devices.sim.sim_signals import ReadOnlySignal
from ophyd_devices.sim.sim_utils import H5Writer from ophyd_devices.sim.sim_utils import H5Writer
from ophyd_devices.utils.bec_device_base import BECDevice, BECDeviceBase from ophyd_devices.utils.bec_device_base import BECDevice, BECDeviceBase
@ -57,6 +61,14 @@ def positioner(name="positioner"):
yield pos yield pos
@pytest.fixture(scope="function")
def async_monitor(name="async_monitor"):
"""Fixture for SimMonitorAsync."""
dm = DMMock()
mon = SimMonitorAsync(name=name, device_manager=dm)
yield mon
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def h5proxy_fixture(camera, name="h5proxy"): def h5proxy_fixture(camera, name="h5proxy"):
"""Fixture for SimCamera.""" """Fixture for SimCamera."""
@ -118,6 +130,13 @@ def test_flyer__init__(flyer):
assert isinstance(flyer, BECFlyerProtocol) assert isinstance(flyer, BECFlyerProtocol)
def test_init_async_monitor(async_monitor):
"""Test the __init__ method of SimMonitorAsync."""
assert isinstance(async_monitor, SimMonitorAsync)
assert isinstance(async_monitor, BECDeviceProtocol)
assert isinstance(async_monitor, BECScanProtocol)
@pytest.mark.parametrize("center", [-10, 0, 10]) @pytest.mark.parametrize("center", [-10, 0, 10])
def test_monitor_readback(monitor, center): def test_monitor_readback(monitor, center):
"""Test the readback method of SimMonitor.""" """Test the readback method of SimMonitor."""
@ -347,3 +366,66 @@ def test_h5writer():
assert h5_writer.data_container == [data] assert h5_writer.data_container == [data]
h5_writer.receive_data(0) h5_writer.receive_data(0)
assert h5_writer.data_container == [data, 0] assert h5_writer.data_container == [data, 0]
def test_async_monitor_stage(async_monitor):
"""Test the stage method of SimMonitorAsync."""
async_monitor.stage()
assert async_monitor.data_buffer["value"] == []
assert async_monitor.data_buffer["timestamp"] == []
def test_async_monitor_prep_random_interval(async_monitor):
"""Test the stage method of SimMonitorAsync."""
async_monitor.custom_prepare.prep_random_interval()
assert async_monitor.custom_prepare._counter == 0
assert async_monitor.current_trigger.get() == 0
assert 0 < async_monitor.custom_prepare._random_send_interval < 10
def test_async_monitor_complete(async_monitor):
"""Test the on_complete method of SimMonitorAsync."""
with (
mock.patch.object(async_monitor.custom_prepare, "_send_data_to_bec") as mock_send,
mock.patch.object(async_monitor.custom_prepare, "prep_random_interval") as mock_prep,
):
async_monitor.complete()
assert mock_send.call_count == 0
async_monitor.data_buffer["value"].append(0)
async_monitor.complete()
assert mock_send.call_count == 1
def test_async_mon_on_trigger(async_monitor):
"""Test the on_trigger method of SimMonitorAsync."""
with (mock.patch.object(async_monitor.custom_prepare, "_send_data_to_bec") as mock_send,):
async_monitor.stage()
upper_limit = async_monitor.custom_prepare._random_send_interval
for ii in range(1, upper_limit + 1):
async_monitor.custom_prepare.on_trigger()
assert async_monitor.current_trigger.get() == ii
assert mock_send.call_count == 1
def test_async_mon_send_data_to_bec(async_monitor):
"""Test the _send_data_to_bec method of SimMonitorAsync."""
async_monitor.scaninfo.scan_msg = SimpleNamespace(metadata={})
async_monitor.data_buffer.update({"value": [0, 5], "timestamp": [0, 0]})
with mock.patch.object(async_monitor.connector, "xadd") as mock_xadd:
async_monitor.custom_prepare._send_data_to_bec()
dev_msg = messages.DeviceMessage(
signals={async_monitor.readback.name: async_monitor.data_buffer},
metadata={"async_update": "extend"},
)
call = [
mock.call(
MessageEndpoints.device_async_readback(
scan_id=async_monitor.scaninfo.scan_id, device=async_monitor.name
),
{"data": dev_msg},
expire=async_monitor.custom_prepare._stream_ttl,
)
]
assert mock_xadd.mock_calls == call
assert async_monitor.data_buffer["value"] == []