637 lines
23 KiB
Python
637 lines
23 KiB
Python
""" This module contains tests for the simulation devices in ophyd_devices """
|
|
|
|
# pylint: disable: all
|
|
import os
|
|
import time
|
|
from types import SimpleNamespace
|
|
from unittest import mock
|
|
|
|
import h5py
|
|
import numpy as np
|
|
import pytest
|
|
from bec_lib import messages
|
|
from bec_lib.endpoints import MessageEndpoints
|
|
from bec_server.device_server.tests.utils import DMMock
|
|
from ophyd import Device, Signal
|
|
from ophyd.status import wait as status_wait
|
|
|
|
from ophyd_devices.interfaces.protocols.bec_protocols import (
|
|
BECDeviceProtocol,
|
|
BECFlyerProtocol,
|
|
BECPositionerProtocol,
|
|
BECScanProtocol,
|
|
BECSignalProtocol,
|
|
)
|
|
from ophyd_devices.sim.sim_camera import SimCamera
|
|
from ophyd_devices.sim.sim_flyer import SimFlyer
|
|
from ophyd_devices.sim.sim_frameworks import H5ImageReplayProxy, SlitProxy
|
|
from ophyd_devices.sim.sim_monitor import SimMonitor, SimMonitorAsync
|
|
from ophyd_devices.sim.sim_positioner import SimLinearTrajectoryPositioner, SimPositioner
|
|
from ophyd_devices.sim.sim_signals import ReadOnlySignal
|
|
from ophyd_devices.sim.sim_utils import H5Writer, LinearTrajectory
|
|
from ophyd_devices.sim.sim_waveform import SimWaveform
|
|
from ophyd_devices.utils.bec_device_base import BECDevice, BECDeviceBase
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def waveform(name="waveform"):
|
|
"""Fixture for SimWaveform."""
|
|
dm = DMMock()
|
|
wave = SimWaveform(name=name, device_manager=dm)
|
|
yield wave
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def signal(name="signal"):
|
|
"""Fixture for Signal."""
|
|
sig = ReadOnlySignal(name=name, value=0)
|
|
yield sig
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def monitor(name="monitor"):
|
|
"""Fixture for SimMonitor."""
|
|
dm = DMMock()
|
|
mon = SimMonitor(name=name, device_manager=dm)
|
|
yield mon
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def camera(name="camera"):
|
|
"""Fixture for SimCamera."""
|
|
dm = DMMock()
|
|
cam = SimCamera(name=name, device_manager=dm)
|
|
cam.filewriter = mock.MagicMock()
|
|
cam.filewriter.compile_full_filename.return_value = ""
|
|
yield cam
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def positioner(name="positioner"):
|
|
"""Fixture for SimPositioner."""
|
|
dm = DMMock()
|
|
pos = SimPositioner(name=name, device_manager=dm)
|
|
yield pos
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def linear_traj_positioner(name="linear_traj_positioner"):
|
|
"""Fixture for SimLinearTrajectoryPositioner."""
|
|
dm = DMMock()
|
|
pos = SimLinearTrajectoryPositioner(name=name, device_manager=dm)
|
|
yield pos
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def async_monitor(name="async_monitor"):
|
|
"""Fixture for SimMonitorAsync."""
|
|
dm = DMMock()
|
|
mon = SimMonitorAsync(name=name, device_manager=dm)
|
|
yield mon
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def h5proxy_fixture(camera, name="h5proxy"):
|
|
"""Fixture for SimCamera."""
|
|
dm = camera.device_manager
|
|
proxy = H5ImageReplayProxy(name=name, device_manager=dm)
|
|
yield proxy, camera
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def slitproxy_fixture(camera, name="slit_proxy"):
|
|
"""Fixture for SimCamera."""
|
|
dm = camera.device_manager
|
|
proxy = SlitProxy(name=name, device_manager=dm)
|
|
samx = SimPositioner(name="samx", device_manager=dm)
|
|
yield proxy, camera, samx
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def flyer(name="flyer"):
|
|
"""Fixture for SimFlyer."""
|
|
dm = DMMock()
|
|
fly = SimFlyer(name=name, device_manager=dm)
|
|
yield fly
|
|
|
|
|
|
def test_camera_with_sim_init():
|
|
"""Test to see if the sim init parameters are passed to the device"""
|
|
dm = DMMock()
|
|
sim = SimCamera(name="sim", device_manager=dm)
|
|
assert sim.sim._model.value == "gaussian"
|
|
model = "constant"
|
|
params = {
|
|
"amplitude": 300,
|
|
"noise": "uniform",
|
|
"noise_multiplier": 1,
|
|
"hot_pixel_coords": [[0, 0], [50, 50]],
|
|
"hot_pixel_types": ["fluctuating", "constant"],
|
|
"hot_pixel_values": [2.0, 2.0],
|
|
}
|
|
sim = SimCamera(name="sim", device_manager=dm, sim_init={"model": model, "params": params})
|
|
assert sim.sim._model.value == model
|
|
assert sim.sim.params == params
|
|
|
|
|
|
def test_monitor_with_sim_init():
|
|
"""Test to see if the sim init parameters are passed to the device"""
|
|
dm = DMMock()
|
|
sim = SimMonitor(name="sim", device_manager=dm)
|
|
assert sim.sim._model._name == "constant"
|
|
model = "GaussianModel"
|
|
params = {
|
|
"amplitude": 500,
|
|
"center": 5,
|
|
"sigma": 4,
|
|
"noise": "uniform",
|
|
"noise_multiplier": 1,
|
|
"ref_motor": "samy",
|
|
}
|
|
sim = SimMonitor(name="sim", device_manager=dm, sim_init={"model": model, "params": params})
|
|
assert sim.sim._model._name == model.strip("Model").lower()
|
|
diff_keys = set(sim.sim.params.keys()) - set(params.keys())
|
|
for k in params:
|
|
assert sim.sim.params[k] == params[k]
|
|
|
|
|
|
def test_signal__init__(signal):
|
|
"""Test the BECProtocol class"""
|
|
assert isinstance(signal, BECDeviceProtocol)
|
|
assert isinstance(signal, BECSignalProtocol)
|
|
|
|
|
|
def test_monitor__init__(monitor):
|
|
"""Test the __init__ method of SimMonitor."""
|
|
assert isinstance(monitor, SimMonitor)
|
|
assert isinstance(monitor, BECSignalProtocol)
|
|
|
|
|
|
def test_camera__init__(camera):
|
|
"""Test the __init__ method of SimMonitor."""
|
|
assert isinstance(camera, SimCamera)
|
|
assert isinstance(camera, BECDeviceProtocol)
|
|
assert isinstance(camera, BECScanProtocol)
|
|
|
|
|
|
def test_positioner__init__(positioner):
|
|
"""Test the __init__ method of SimPositioner."""
|
|
assert isinstance(positioner, SimPositioner)
|
|
assert isinstance(positioner, BECDeviceProtocol)
|
|
assert isinstance(positioner, BECScanProtocol)
|
|
assert isinstance(positioner, BECPositionerProtocol)
|
|
|
|
|
|
def test_flyer__init__(flyer):
|
|
"""Test the __init__ method of SimFlyer."""
|
|
assert isinstance(flyer, SimFlyer)
|
|
assert isinstance(flyer, BECDeviceProtocol)
|
|
assert isinstance(flyer, BECScanProtocol)
|
|
assert isinstance(flyer, BECFlyerProtocol)
|
|
|
|
|
|
def test_init_async_monitor(async_monitor):
|
|
"""Test the __init__ method of SimMonitorAsync."""
|
|
assert isinstance(async_monitor, SimMonitorAsync)
|
|
assert isinstance(async_monitor, BECDeviceProtocol)
|
|
assert isinstance(async_monitor, BECScanProtocol)
|
|
|
|
|
|
@pytest.mark.parametrize("center", [-10, 0, 10])
|
|
def test_monitor_readback(monitor, center):
|
|
"""Test the readback method of SimMonitor."""
|
|
motor_pos = 0
|
|
monitor.device_manager.add_device(name="samx", value=motor_pos)
|
|
for model_name in monitor.sim.get_models():
|
|
monitor.sim.select_model(model_name)
|
|
monitor.sim.params["noise_multipler"] = 10
|
|
monitor.sim.params["ref_motor"] = "samx"
|
|
if "c" in monitor.sim.params:
|
|
monitor.sim.params["c"] = center
|
|
elif "center" in monitor.sim.params:
|
|
monitor.sim.params["center"] = center
|
|
assert isinstance(monitor.read()[monitor.name]["value"], monitor.BIT_DEPTH)
|
|
expected_value = monitor.sim._model.eval(monitor.sim._model_params, x=motor_pos)
|
|
print(expected_value, monitor.read()[monitor.name]["value"])
|
|
tolerance = (
|
|
monitor.sim.params["noise_multipler"] + 1
|
|
) # due to ceiling in calculation, but maximum +1int
|
|
assert np.isclose(
|
|
monitor.read()[monitor.name]["value"],
|
|
expected_value,
|
|
atol=monitor.sim.params["noise_multipler"] + 1,
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("amplitude, noise_multiplier", [(0, 1), (100, 10), (1000, 50)])
|
|
def test_camera_readback(camera, amplitude, noise_multiplier):
|
|
"""Test the readback method of SimMonitor."""
|
|
for model_name in camera.sim.get_models():
|
|
camera.sim.select_model(model_name)
|
|
camera.sim.params = {"noise_multiplier": noise_multiplier}
|
|
camera.sim.params = {"amplitude": amplitude}
|
|
camera.sim.params = {"noise": "poisson"}
|
|
assert camera.image.get().shape == camera.SHAPE
|
|
assert isinstance(camera.image.get()[0, 0], camera.BIT_DEPTH)
|
|
camera.sim.params = {"noise": "uniform"}
|
|
camera.sim.params = {"hot_pixel_coords": []}
|
|
camera.sim.params = {"hot_pixel_values": []}
|
|
camera.sim.params = {"hot_pixel_types": []}
|
|
assert camera.image.get().shape == camera.SHAPE
|
|
assert isinstance(camera.image.get()[0, 0], camera.BIT_DEPTH)
|
|
assert (camera.image.get() <= (amplitude + noise_multiplier + 1)).all()
|
|
|
|
|
|
def test_positioner_move(positioner):
|
|
"""Test the move method of SimPositioner."""
|
|
positioner.move(0).wait()
|
|
assert np.isclose(
|
|
positioner.read()[positioner.name]["value"], 0, atol=positioner.tolerance.get()
|
|
)
|
|
positioner.move(10).wait()
|
|
assert np.isclose(
|
|
positioner.read()[positioner.name]["value"], 10, atol=positioner.tolerance.get()
|
|
)
|
|
|
|
|
|
def test_positioner_motor_is_moving_signal(positioner):
|
|
"""Test that motor is moving is 0 and 1 while (not) moving"""
|
|
positioner.move(0).wait()
|
|
positioner.velocity.set(2)
|
|
assert positioner.motor_is_moving.get() == 0
|
|
status = positioner.move(5)
|
|
assert positioner.motor_is_moving.get() == 1
|
|
status.wait()
|
|
assert positioner.motor_is_moving.get() == 0
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"initial_position, final_position, max_velocity, acceleration",
|
|
[(0, 100, 5, 20), (0, 1, 5, 20)], # Trapezoidal profile # Triangular profile
|
|
)
|
|
def test_linear_traj(initial_position, final_position, max_velocity, acceleration):
|
|
"""Test the LinearTrajectory class"""
|
|
initial_time = time.time()
|
|
trajectory = LinearTrajectory(
|
|
initial_position, final_position, max_velocity, acceleration, initial_time
|
|
)
|
|
|
|
# Test acceleration phase
|
|
t1 = initial_time + trajectory.time_accel / 2 # Halfway through acceleration phase
|
|
pos1 = trajectory.position(t1)
|
|
expected_pos1 = initial_position + 0.5 * acceleration * (trajectory.time_accel / 2) ** 2
|
|
assert np.isclose(pos1, expected_pos1), f"Expected {expected_pos1}, got {pos1}"
|
|
|
|
# Test constant velocity phase
|
|
if trajectory.time_const_vel > 0:
|
|
t2 = (
|
|
initial_time + trajectory.time_accel + trajectory.time_const_vel / 2
|
|
) # Halfway through constant velocity phase
|
|
pos2 = trajectory.position(t2)
|
|
expected_pos2 = (
|
|
initial_position
|
|
+ trajectory.distance_to_max_velocity
|
|
+ max_velocity * (t2 - initial_time - trajectory.time_accel)
|
|
)
|
|
assert np.isclose(pos2, expected_pos2), f"Expected {expected_pos2}, got {pos2}"
|
|
|
|
# Test deceleration phase
|
|
t3 = (
|
|
initial_time + trajectory.total_time - trajectory.time_decel / 2
|
|
) # Halfway through deceleration phase
|
|
pos3 = trajectory.position(t3)
|
|
t_decel = t3 - (initial_time + trajectory.time_accel + trajectory.time_const_vel)
|
|
expected_pos3 = final_position - 0.5 * acceleration * (trajectory.time_decel / 2) ** 2
|
|
assert np.isclose(pos3, expected_pos3), f"Expected {expected_pos3}, got {pos3}"
|
|
|
|
# Test end
|
|
t4 = initial_time + trajectory.total_time + 0.1 # Slightly after end
|
|
pos4 = trajectory.position(t4)
|
|
assert pos4 == final_position
|
|
assert trajectory.ended
|
|
|
|
|
|
def test_sim_linear_trajectory_positioner(linear_traj_positioner):
|
|
vel = 5 # velocity 5 m.s^-1
|
|
acc = 20 # acceleration 20 m.s^-2
|
|
linear_traj_positioner.velocity.set(vel)
|
|
linear_traj_positioner.acceleration.set(vel / acc) # acctime 250 ms
|
|
linear_traj_positioner.update_frequency = 100
|
|
assert linear_traj_positioner.position == 0
|
|
|
|
t0 = time.time()
|
|
trajectory = LinearTrajectory(0, 50, vel, acc, t0)
|
|
t2 = (
|
|
t0 + trajectory.time_accel + trajectory.time_const_vel / 2
|
|
) # Halfway through constant velocity phase
|
|
decel_distance = trajectory.position(t0 + trajectory.time_accel)
|
|
expected_pos = trajectory.position(t2) + decel_distance
|
|
|
|
linear_traj_positioner.move(50)
|
|
# move is non-blocking, so sleep until it is time to stop:
|
|
time.sleep(t2 - t0)
|
|
linear_traj_positioner.stop()
|
|
|
|
# ensure position is ok
|
|
assert pytest.approx(linear_traj_positioner.position - expected_pos, abs=1e-1) == 0
|
|
|
|
|
|
@pytest.mark.parametrize("proxy_active", [True, False])
|
|
def test_sim_camera_proxies(camera, proxy_active):
|
|
"""Test mocking compute_method with framework class"""
|
|
camera.device_manager.add_device("test_proxy")
|
|
if proxy_active:
|
|
camera._registered_proxies["test_proxy"] = camera.image.name
|
|
else:
|
|
camera._registered_proxies = {}
|
|
proxy = camera.device_manager.devices["test_proxy"]
|
|
mock_method = mock.MagicMock()
|
|
mock_obj = proxy.obj
|
|
mock_obj.lookup = mock.MagicMock()
|
|
mock_obj.lookup.return_value = {camera.name: {"method": mock_method, "args": 1, "kwargs": 1}}
|
|
camera.image.read()
|
|
if proxy_active:
|
|
assert len(mock_obj.lookup.mock_calls) > 0
|
|
elif not proxy_active:
|
|
assert len(mock_obj.lookup.mock_calls) == 0
|
|
|
|
|
|
def test_BECDeviceBase():
|
|
# Test the BECDeviceBase class
|
|
bec_device_base = BECDeviceBase(name="test")
|
|
assert isinstance(bec_device_base, BECDevice)
|
|
assert bec_device_base.connected is True
|
|
signal = Signal(name="signal")
|
|
assert isinstance(signal, BECDevice)
|
|
device = Device(name="device")
|
|
assert isinstance(device, BECDevice)
|
|
|
|
|
|
def test_h5proxy(h5proxy_fixture, camera):
|
|
"""Test h5 camera proxy read from h5 file"""
|
|
h5proxy, camera = h5proxy_fixture
|
|
mock_proxy = mock.MagicMock()
|
|
camera.device_manager.devices.update({h5proxy.name: mock_proxy})
|
|
mock_proxy.enabled = True
|
|
mock_proxy.obj = h5proxy
|
|
fname = os.path.expanduser("tests/test_data/h5_test_file.h5")
|
|
h5entry = "entry/data/data"
|
|
with h5py.File(fname, "r") as f:
|
|
data = f[h5entry][...]
|
|
# pylint: disable=protected-access
|
|
h5proxy._update_device_config(
|
|
{camera.name: {"signal_name": "image", "file_source": fname, "h5_entry": h5entry}}
|
|
)
|
|
camera._registered_proxies.update({h5proxy.name: camera.image.name})
|
|
camera.sim.params = {"noise": "none", "noise_multiplier": 0}
|
|
camera.scaninfo.sim_mode = True
|
|
# pylint: disable=no-member
|
|
camera.image_shape.set(data.shape[1:])
|
|
camera.stage()
|
|
img = camera.image.get()
|
|
assert (img == data[0, ...]).all()
|
|
camera.unstage()
|
|
|
|
|
|
def test_slitproxy(slitproxy_fixture):
|
|
"""Test slit proxy to compute readback from readback of positioner samx"""
|
|
proxy, camera, samx = slitproxy_fixture
|
|
for dev_name, dev in proxy.device_manager.devices.items():
|
|
camera.device_manager.devices.update({dev_name: mock.MagicMock()})
|
|
camera.device_manager.devices.get(dev_name).obj = dev
|
|
camera.device_manager.devices.get(dev_name).enabled = True
|
|
px_size = 0.5
|
|
slitwidth = 2
|
|
proxy._update_device_config(
|
|
{
|
|
camera.name: {
|
|
"signal_name": "image",
|
|
"center_offset": [0, 0],
|
|
"covariance": [[1000, 500], [200, 1000]],
|
|
"pixel_size": px_size,
|
|
"ref_motors": [samx.name],
|
|
"slit_width": [slitwidth],
|
|
"motor_dir": [0],
|
|
}
|
|
}
|
|
)
|
|
camera._registered_proxies.update({proxy.name: camera.image.name})
|
|
mock_proxy = mock.MagicMock()
|
|
mock_samx = mock.MagicMock()
|
|
mock_camera = mock.MagicMock()
|
|
camera.device_manager.devices.update(
|
|
{proxy.name: mock_proxy, samx.name: mock_samx, camera.name: mock_camera}
|
|
)
|
|
|
|
mock_proxy.enabled = True
|
|
mock_samx.enabled = True
|
|
mock_camera.enabled = True
|
|
mock_camera.obj = camera
|
|
mock_samx.obj = samx
|
|
mock_proxy.obj = proxy
|
|
camera.sim.params = {"noise": "none", "noise_multiplier": 0, "hot_pixel_values": [0, 0, 0]}
|
|
samx.delay = 0
|
|
samx_pos = 0
|
|
samx.move(samx_pos)
|
|
proxy._gaussian_blur_sigma = 0
|
|
img = camera.image.get()
|
|
edges = (
|
|
int(img.shape[0] // 2 - samx_pos / px_size - slitwidth / (2 * px_size)),
|
|
int(img.shape[0] // 2 + samx_pos / px_size + slitwidth / (2 * px_size)),
|
|
)
|
|
assert (img[:, : edges[0]] == 0).all()
|
|
assert (img[:, edges[1] :] == 0).all()
|
|
samx_pos = 13.3
|
|
samx.move(samx_pos)
|
|
img = camera.image.get()
|
|
edges = (
|
|
int(img.shape[0] // 2 + samx_pos / px_size - slitwidth / (2 * px_size)),
|
|
int(img.shape[0] // 2 + samx_pos / px_size + slitwidth / (2 * px_size)),
|
|
)
|
|
assert (img[:, : edges[0]] == 0).all()
|
|
assert (img[:, edges[1] :] == 0).all()
|
|
|
|
|
|
def test_cam_stage_h5writer(camera):
|
|
"""Test the H5Writer class"""
|
|
with (
|
|
mock.patch.object(camera, "h5_writer") as mock_h5_writer,
|
|
mock.patch.object(
|
|
camera.custom_prepare, "publish_file_location"
|
|
) as mock_publish_file_location,
|
|
):
|
|
camera.scaninfo.num_points = 10
|
|
camera.scaninfo.frames_per_trigger = 1
|
|
camera.scaninfo.exp_time = 1
|
|
camera.stage()
|
|
assert mock_h5_writer.prepare.call_count == 0
|
|
camera.unstage()
|
|
camera.write_to_disk.put(True)
|
|
camera.stage()
|
|
calls = [mock.call(file_path="", h5_entry="/entry/data/data")]
|
|
assert mock_h5_writer.prepare.mock_calls == calls
|
|
# mock_h5_writer.prepare
|
|
|
|
|
|
def test_cam_complete(camera):
|
|
"""Test the complete method of SimCamera."""
|
|
with mock.patch.object(camera, "h5_writer") as mock_h5_writer:
|
|
status = camera.complete()
|
|
status_wait(status)
|
|
assert status.done is True
|
|
assert status.success is True
|
|
assert mock_h5_writer.write_data.call_count == 0
|
|
camera.write_to_disk.put(True)
|
|
status = camera.complete()
|
|
status_wait(status)
|
|
assert mock_h5_writer.write_data.call_count == 1
|
|
|
|
|
|
def test_cam_trigger(camera):
|
|
"""Test the trigger method of SimCamera."""
|
|
with mock.patch.object(camera, "h5_writer") as mock_h5_writer:
|
|
data = []
|
|
status = camera.trigger()
|
|
status_wait(status)
|
|
assert status.done is True
|
|
assert status.success is True
|
|
assert mock_h5_writer.receive_data.call_count == 0
|
|
camera.write_to_disk.put(True)
|
|
status = camera.trigger()
|
|
status_wait(status)
|
|
assert mock_h5_writer.receive_data.call_count == 1
|
|
status = camera.trigger()
|
|
status_wait(status)
|
|
assert mock_h5_writer.receive_data.call_count == 2
|
|
|
|
|
|
def test_h5writer():
|
|
"""Test the H5Writer class"""
|
|
|
|
h5_writer = H5Writer()
|
|
with mock.patch.object(h5_writer, "create_dir") as mock_create_dir:
|
|
h5_writer.data_container = [0, 1, 2]
|
|
h5_writer.prepare(file_path="test.h5", h5_entry="entry/data/data")
|
|
assert mock_create_dir.call_count == 1
|
|
assert h5_writer.data_container == []
|
|
assert h5_writer.file_path == "test.h5"
|
|
assert h5_writer.h5_entry == "entry/data/data"
|
|
|
|
data = [0, 1, 2, 3]
|
|
h5_writer.receive_data(data)
|
|
assert h5_writer.data_container == [data]
|
|
h5_writer.receive_data(0)
|
|
assert h5_writer.data_container == [data, 0]
|
|
|
|
|
|
def test_async_monitor_stage(async_monitor):
|
|
"""Test the stage method of SimMonitorAsync."""
|
|
async_monitor.stage()
|
|
assert async_monitor.data_buffer["value"] == []
|
|
assert async_monitor.data_buffer["timestamp"] == []
|
|
|
|
|
|
def test_async_monitor_prep_random_interval(async_monitor):
|
|
"""Test the stage method of SimMonitorAsync."""
|
|
async_monitor.custom_prepare.prep_random_interval()
|
|
assert async_monitor.custom_prepare._counter == 0
|
|
assert async_monitor.current_trigger.get() == 0
|
|
assert 0 < async_monitor.custom_prepare._random_send_interval < 10
|
|
|
|
|
|
def test_async_monitor_complete(async_monitor):
|
|
"""Test the on_complete method of SimMonitorAsync."""
|
|
with (
|
|
mock.patch.object(async_monitor.custom_prepare, "_send_data_to_bec") as mock_send,
|
|
mock.patch.object(async_monitor.custom_prepare, "prep_random_interval") as mock_prep,
|
|
):
|
|
status = async_monitor.complete()
|
|
status_wait(status)
|
|
assert status.done is True
|
|
assert status.success is True
|
|
assert mock_send.call_count == 0
|
|
async_monitor.data_buffer["value"].append(0)
|
|
status = async_monitor.complete()
|
|
status_wait(status)
|
|
assert status.done is True
|
|
assert status.success is True
|
|
assert mock_send.call_count == 1
|
|
|
|
|
|
def test_async_mon_on_trigger(async_monitor):
|
|
"""Test the on_trigger method of SimMonitorAsync."""
|
|
with (mock.patch.object(async_monitor.custom_prepare, "_send_data_to_bec") as mock_send,):
|
|
async_monitor.custom_prepare.on_stage()
|
|
upper_limit = async_monitor.custom_prepare._random_send_interval
|
|
for ii in range(1, upper_limit + 1):
|
|
status = async_monitor.custom_prepare.on_trigger()
|
|
status_wait(status)
|
|
assert async_monitor.current_trigger.get() == ii
|
|
assert mock_send.call_count == 1
|
|
|
|
|
|
def test_async_mon_send_data_to_bec(async_monitor):
|
|
"""Test the _send_data_to_bec method of SimMonitorAsync."""
|
|
async_monitor.scaninfo.scan_msg = SimpleNamespace(metadata={})
|
|
async_monitor.data_buffer.update({"value": [0, 5], "timestamp": [0, 0]})
|
|
with mock.patch.object(async_monitor.connector, "xadd") as mock_xadd:
|
|
async_monitor.custom_prepare._send_data_to_bec()
|
|
dev_msg = messages.DeviceMessage(
|
|
signals={async_monitor.readback.name: async_monitor.data_buffer},
|
|
metadata={"async_update": async_monitor.async_update.get()},
|
|
)
|
|
|
|
call = [
|
|
mock.call(
|
|
MessageEndpoints.device_async_readback(
|
|
scan_id=async_monitor.scaninfo.scan_id, device=async_monitor.name
|
|
),
|
|
{"data": dev_msg},
|
|
expire=async_monitor.custom_prepare._stream_ttl,
|
|
)
|
|
]
|
|
assert mock_xadd.mock_calls == call
|
|
assert async_monitor.data_buffer["value"] == []
|
|
|
|
|
|
def test_positioner_updated_timestamp(positioner):
|
|
"""Test the updated_timestamp method of SimPositioner."""
|
|
positioner.sim.sim_state[positioner.name]["value"] = 1
|
|
readback = positioner.read()[positioner.name]
|
|
timestamp = readback["timestamp"]
|
|
assert readback["value"] == 1
|
|
positioner.sim.sim_state[positioner.name]["value"] = 5
|
|
readback = positioner.read()[positioner.name]
|
|
assert readback["value"] == 5
|
|
assert readback["timestamp"] > timestamp
|
|
|
|
|
|
def test_waveform(waveform):
|
|
"""Test the SimWaveform class"""
|
|
waveform.sim.select_model("GaussianModel")
|
|
waveform.sim.params = {"amplitude": 500, "center": 500, "sigma": 10}
|
|
data = waveform.waveform.get()
|
|
assert isinstance(data, np.ndarray)
|
|
assert data.shape == waveform.SHAPE
|
|
assert np.isclose(np.argmax(data), 500, atol=5)
|
|
waveform.waveform_shape.put(50)
|
|
data = waveform.waveform.get()
|
|
for model in waveform.sim.get_all_sim_models():
|
|
waveform.sim.select_model(model)
|
|
waveform.waveform.get()
|
|
# Now also test the async readback
|
|
mock_connector = waveform.connector = mock.MagicMock()
|
|
mock_run_subs = waveform._run_subs = mock.MagicMock()
|
|
waveform.scaninfo.scan_msg = SimpleNamespace(metadata={})
|
|
waveform.scaninfo.scan_id = "test"
|
|
status = waveform.trigger()
|
|
timer = 0
|
|
while not status.done:
|
|
time.sleep(0.1)
|
|
timer += 0.1
|
|
if timer > 5:
|
|
raise TimeoutError("Trigger did not complete")
|
|
assert status.done is True
|
|
assert mock_connector.xadd.call_count == 1
|
|
assert mock_run_subs.call_count == 1
|