106 lines
3.5 KiB
Python
106 lines
3.5 KiB
Python
"""Tests for Trigger device."""
|
|
|
|
import threading
|
|
from unittest import mock
|
|
|
|
import ophyd
|
|
import pytest
|
|
from bec_server.device_server.tests.utils import DMMock
|
|
from ophyd import DeviceStatus
|
|
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
|
|
|
|
from superxas_bec.devices.trigger import ContinuousSamplingMode, Trigger
|
|
|
|
# pylint: disable=protected-access
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def trigger():
|
|
"""Trigger device with mocked EPICS PVs."""
|
|
name = "trigger"
|
|
prefix = "X10DA-ES1:"
|
|
with mock.patch.object(ophyd, "cl") as mock_cl:
|
|
mock_cl.get_pv = MockPV
|
|
mock_cl.thread_class = threading.Thread
|
|
dev = Trigger(name=name, prefix=prefix, device_manager=DMMock())
|
|
patch_dual_pvs(dev)
|
|
yield dev
|
|
|
|
|
|
@pytest.mark.parametrize(["exp_time", "cycles"], [(0.1, 1), (0.5, 2), (2, 10)])
|
|
def test_devices_trigger_stage_core_scans(trigger, exp_time, cycles):
|
|
"""Test on_connected method of Trigger device.
|
|
|
|
The pytest.mark.parametrize decorator is used to run the test for each parameter in the list.
|
|
"""
|
|
assert trigger.prefix == "X10DA-ES1:"
|
|
assert trigger.name == "trigger"
|
|
assert trigger._pv_timeout == 1
|
|
#
|
|
trigger.on_connected()
|
|
trigger._pv_timeout = 0.2
|
|
trigger.start_csmpl.put(ContinuousSamplingMode.ON)
|
|
assert trigger.start_csmpl.get() == ContinuousSamplingMode.ON
|
|
|
|
# Set scan_info information for scan
|
|
trigger.scan_info.msg.scan_name = "step_scan"
|
|
trigger.scan_info.msg.scan_parameters["exp_time"] = exp_time
|
|
|
|
# On stage should set exposure time
|
|
status = trigger.stage()
|
|
if isinstance(status, DeviceStatus):
|
|
status.wait()
|
|
assert trigger.start_csmpl.get() == ContinuousSamplingMode.OFF
|
|
# cycles will be multiple of exp_time/0.2 as int, minimum 1.
|
|
assert trigger.total_cycles.get() == cycles
|
|
|
|
|
|
def test_devices_trigger_unstage(trigger):
|
|
"""
|
|
Test on_unstage method of Trigger device.
|
|
|
|
This should put start_csmpl to ON.
|
|
"""
|
|
trigger.start_csmpl.put(ContinuousSamplingMode.OFF)
|
|
assert trigger.start_csmpl.get() == ContinuousSamplingMode.OFF
|
|
status = trigger.unstage()
|
|
status.wait()
|
|
assert trigger.start_csmpl.get() == ContinuousSamplingMode.ON
|
|
|
|
|
|
def test_devices_trigger_stop(trigger):
|
|
"""
|
|
Test on_stop method of Trigger device.
|
|
|
|
This should stop the task_handler.
|
|
"""
|
|
assert trigger.stopped is False
|
|
with mock.patch.object(trigger, "task_handler") as mock_handler:
|
|
trigger.stop()
|
|
mock_handler.shutdown.assert_called_once()
|
|
assert trigger.stopped is True
|
|
|
|
|
|
def test_devices_trigger_trigger(trigger):
|
|
"""Test trigger method of Trigger device."""
|
|
# TODO we should use the ScanStatusMessage to update scan_info here
|
|
falcon = mock.MagicMock()
|
|
falcon.name.return_value = "falcon"
|
|
status = DeviceStatus(device=falcon)
|
|
status.set_finished()
|
|
falcon._stop_erase_and_wait_for_acquiring.return_value = status
|
|
trigger.device_manager.devices["falcon"] = falcon
|
|
|
|
trigger_status = DeviceStatus(device=trigger)
|
|
trigger_status.set_finished()
|
|
with mock.patch.object(
|
|
trigger.task_handler, "submit_task", return_value=trigger_status
|
|
) as mock_submit:
|
|
status = trigger.trigger()
|
|
assert falcon._stop_erase_and_wait_for_acquiring.call_count == 1
|
|
assert trigger.smpl.get() == 1 # smpl called with 1
|
|
# TODO check that the task_handler is called with the correct function
|
|
# This is currently not easily testable
|
|
assert mock_submit.call_count == 1
|
|
assert trigger_status == status
|