import threading import time from unittest import mock import numpy as np import ophyd import pytest from bec_server.device_server.tests.utils import DMMock from ophyd_devices.tests.utils import MockPV, patch_dual_pvs from phoenix_bec.devices.phoenix_trigger import SAMPLING, PhoenixTrigger @pytest.fixture(scope="function") def mock_trigger(): name = "phoenix_trigger" prefix = "X07MB-OP2:" dm = DMMock() with mock.patch.object(dm, "connector"): with ( mock.patch( "ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter" ) as filemixin, mock.patch( "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config" ) as mock_service_config, ): with mock.patch.object(ophyd, "cl") as mock_cl: mock_cl.get_pv = MockPV mock_cl.thread_class = threading.Thread with mock.patch.object(PhoenixTrigger, "_init"): det = PhoenixTrigger(name=name, prefix=prefix, device_manager=dm) patch_dual_pvs(det) det.TIMEOUT_FOR_SIGNALS = 0.1 yield det def test_phoenix_trigger_init(mock_trigger): """Test PhoenixTrigger init""" assert mock_trigger.name == "phoenix_trigger" assert mock_trigger.prefix == "X07MB-OP2:" def test_phoenix_trigger_stage(mock_trigger): """Test PhoenixTrigger on_stage""" with mock.patch.object(mock_trigger.scaninfo, "load_scan_metadata") as mock_load_scan_metadata: mock_trigger.scaninfo.scan_type = "step" mock_trigger.scaninfo.exp_time = exp_time = 1 mock_trigger.stage() assert mock_load_scan_metadata.call_count == 1 assert mock_trigger.start_csmpl.get() == 0 assert mock_trigger.total_cycles.get() == np.ceil(exp_time * 5) assert mock_trigger.smpl.get() == 1 def test_phoenix_trigger_unstage(mock_trigger): """Test PhoenixTrigger on_unstage""" with mock.patch.object(mock_trigger.custom_prepare, "on_stop") as mock_on_stop: mock_trigger.unstage() assert mock_on_stop.call_count == 1 def test_phoenix_trigger_stop(mock_trigger): """Test PhoenixTrigger on_stop""" with mock.patch.object(mock_trigger.smpl, "put") as mock_smpl_put: mock_trigger.smpl_done._read_pv.mock_data = SAMPLING.RUNNING mock_trigger.stop() assert mock_trigger.stopped is True # assert mock_trigger.total_cycles.get() == 5 # 5 cycles is too tight during development assert mock_trigger.start_csmpl.get() == 1 assert mock_smpl_put.call_args_list == [mock.call(1), mock.call(1)] """ uncomment this test, as device names etc will change and as other devices will bee added def test_phoenix_trigger_trigger(mock_trigger): #Test PhoenixTrigger on_trigger # #irst test that the trigger timeouts due to readback from smpl_done not being done. #Afterwards, check that status object resolved correctly if smpl_done is done. # exp_time = 0.05 mock_trigger.device_manager.add_device("falcon_nohdf5") falcon_state = mock_trigger.device_manager.devices.falcon_nohdf5.state = mock.MagicMock() falcon_state.get = mock.MagicMock(return_value=1) mock_trigger.scaninfo.scan_type = "step" mock_trigger.scaninfo.exp_time = exp_time with mock.patch.object( mock_trigger.custom_prepare, "wait_with_status", return_value=mock.MagicMock() ) as mock_wait_with_status: status = mock_trigger.trigger() assert mock_wait_with_status.call_count == 1 assert mock_wait_with_status.call_args[1]["signal_conditions"] == [ (mock_trigger.smpl_done.get, SAMPLING.DONE) ] assert mock_wait_with_status.call_args[1]["timeout"] == 5 * exp_time assert mock_wait_with_status.call_args[1]["check_stopped"] is True """