"""Tests for Trigger device.""" import threading from unittest import mock import ophyd import pytest from bec_server.device_server.tests.utils import DMMock from ophyd import DeviceStatus from ophyd_devices.tests.utils import MockPV, patch_dual_pvs from superxas_bec.devices.trigger import ContinuousSamplingMode, Trigger # pylint: disable=protected-access # pylint: disable=redefined-outer-name @pytest.fixture(scope="function") def trigger(): """Trigger device with mocked EPICS PVs.""" name = "trigger" prefix = "X10DA-ES1:" with mock.patch.object(ophyd, "cl") as mock_cl: mock_cl.get_pv = MockPV mock_cl.thread_class = threading.Thread dev = Trigger(name=name, prefix=prefix, device_manager=DMMock()) patch_dual_pvs(dev) yield dev @pytest.mark.parametrize(["exp_time", "cycles"], [(0.1, 1), (0.5, 2), (2, 10)]) def test_devices_trigger_stage_core_scans(trigger, exp_time, cycles): """Test on_connected method of Trigger device. The pytest.mark.parametrize decorator is used to run the test for each parameter in the list. """ assert trigger.prefix == "X10DA-ES1:" assert trigger.name == "trigger" assert trigger._pv_timeout == 1 # trigger.on_connected() trigger._pv_timeout = 0.2 trigger.start_csmpl.put(ContinuousSamplingMode.ON) assert trigger.start_csmpl.get() == ContinuousSamplingMode.ON # Set scan_info information for scan trigger.scan_info.msg.scan_name = "step_scan" trigger.scan_info.msg.scan_parameters["exp_time"] = exp_time # On stage should set exposure time status = trigger.stage() if isinstance(status, DeviceStatus): status.wait() assert trigger.start_csmpl.get() == ContinuousSamplingMode.OFF # cycles will be multiple of exp_time/0.2 as int, minimum 1. assert trigger.total_cycles.get() == cycles def test_devices_trigger_unstage(trigger): """ Test on_unstage method of Trigger device. This should put start_csmpl to ON. """ trigger.start_csmpl.put(ContinuousSamplingMode.OFF) assert trigger.start_csmpl.get() == ContinuousSamplingMode.OFF status = trigger.unstage() status.wait() assert trigger.start_csmpl.get() == ContinuousSamplingMode.ON def test_devices_trigger_stop(trigger): """ Test on_stop method of Trigger device. This should stop the task_handler. """ assert trigger.stopped is False with mock.patch.object(trigger, "task_handler") as mock_handler: trigger.stop() mock_handler.shutdown.assert_called_once() assert trigger.stopped is True def test_devices_trigger_trigger(trigger): """Test trigger method of Trigger device.""" # TODO we should use the ScanStatusMessage to update scan_info here falcon = mock.MagicMock() falcon.name.return_value = "falcon" status = DeviceStatus(device=falcon) status.set_finished() falcon._stop_erase_and_wait_for_acquiring.return_value = status trigger.device_manager.devices["falcon"] = falcon trigger_status = DeviceStatus(device=trigger) trigger_status.set_finished() with mock.patch.object( trigger.task_handler, "submit_task", return_value=trigger_status ) as mock_submit: status = trigger.trigger() assert falcon._stop_erase_and_wait_for_acquiring.call_count == 1 assert trigger.smpl.get() == 1 # smpl called with 1 # TODO check that the task_handler is called with the correct function # This is currently not easily testable assert mock_submit.call_count == 1 assert trigger_status == status