Files
phoenix_bec/tests/tests_devices/test_phoenix_trigger.py

104 lines
3.9 KiB
Python

import threading
import time
from unittest import mock
import numpy as np
import ophyd
import pytest
from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from phoenix_bec.devices.phoenix_trigger import SAMPLING, PhoenixTrigger
@pytest.fixture(scope="function")
def mock_trigger():
name = "phoenix_trigger"
prefix = "X07MB-OP2:"
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
mock.patch(
"ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"
) as filemixin,
mock.patch(
"ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
) as mock_service_config,
):
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
with mock.patch.object(PhoenixTrigger, "_init"):
det = PhoenixTrigger(name=name, prefix=prefix, device_manager=dm)
patch_dual_pvs(det)
det.TIMEOUT_FOR_SIGNALS = 0.1
yield det
def test_phoenix_trigger_init(mock_trigger):
"""Test PhoenixTrigger init"""
assert mock_trigger.name == "phoenix_trigger"
assert mock_trigger.prefix == "X07MB-OP2:"
def test_phoenix_trigger_stage(mock_trigger):
"""Test PhoenixTrigger on_stage"""
with mock.patch.object(mock_trigger.scaninfo, "load_scan_metadata") as mock_load_scan_metadata:
mock_trigger.scaninfo.scan_type = "step"
mock_trigger.scaninfo.exp_time = exp_time = 1
mock_trigger.stage()
assert mock_load_scan_metadata.call_count == 1
assert mock_trigger.start_csmpl.get() == 0
assert mock_trigger.total_cycles.get() == np.ceil(exp_time * 5)
assert mock_trigger.smpl.get() == 1
def test_phoenix_trigger_unstage(mock_trigger):
"""Test PhoenixTrigger on_unstage"""
with mock.patch.object(mock_trigger.custom_prepare, "on_stop") as mock_on_stop:
mock_trigger.unstage()
assert mock_on_stop.call_count == 1
def test_phoenix_trigger_stop(mock_trigger):
"""Test PhoenixTrigger on_stop"""
with mock.patch.object(mock_trigger.smpl, "put") as mock_smpl_put:
mock_trigger.smpl_done._read_pv.mock_data = SAMPLING.RUNNING
mock_trigger.stop()
assert mock_trigger.stopped is True
# assert mock_trigger.total_cycles.get() == 5
# 5 cycles is too tight during development
assert mock_trigger.start_csmpl.get() == 1
assert mock_smpl_put.call_args_list == [mock.call(1), mock.call(1)]
"""
uncomment this test, as device names etc will change
and as other devices will bee added
def test_phoenix_trigger_trigger(mock_trigger):
#Test PhoenixTrigger on_trigger
#
#irst test that the trigger timeouts due to readback from smpl_done not being done.
#Afterwards, check that status object resolved correctly if smpl_done is done.
#
exp_time = 0.05
mock_trigger.device_manager.add_device("falcon_nohdf5")
falcon_state = mock_trigger.device_manager.devices.falcon_nohdf5.state = mock.MagicMock()
falcon_state.get = mock.MagicMock(return_value=1)
mock_trigger.scaninfo.scan_type = "step"
mock_trigger.scaninfo.exp_time = exp_time
with mock.patch.object(
mock_trigger.custom_prepare, "wait_with_status", return_value=mock.MagicMock()
) as mock_wait_with_status:
status = mock_trigger.trigger()
assert mock_wait_with_status.call_count == 1
assert mock_wait_with_status.call_args[1]["signal_conditions"] == [
(mock_trigger.smpl_done.get, SAMPLING.DONE)
]
assert mock_wait_with_status.call_args[1]["timeout"] == 5 * exp_time
assert mock_wait_with_status.call_args[1]["check_stopped"] is True
"""