ophyd_devices/tests/test_base_classes.py

206 lines
6.5 KiB
Python

# pylint: skip-file
import time
from unittest import mock
import pytest
from ophyd import DeviceStatus, Staged
from ophyd.utils.errors import RedundantStaging
from ophyd_devices.interfaces.base_classes.bec_device_base import BECDeviceBase, CustomPrepare
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
@pytest.fixture
def detector_base():
yield BECDeviceBase(name="test_detector")
def test_detector_base_init(detector_base):
assert detector_base.stopped is False
assert detector_base.name == "test_detector"
assert "base_path" in detector_base.filewriter.service_config
assert isinstance(detector_base.scaninfo, BecScaninfoMixin)
assert issubclass(detector_base.custom_prepare_cls, CustomPrepare)
def test_stage(detector_base):
detector_base._staged = Staged.yes
with pytest.raises(RedundantStaging):
detector_base.stage()
assert detector_base.stopped is False
detector_base._staged = Staged.no
with (
mock.patch.object(detector_base.custom_prepare, "on_stage") as mock_on_stage,
mock.patch.object(detector_base.scaninfo, "load_scan_metadata") as mock_load_metadata,
):
rtr = detector_base.stage()
assert isinstance(rtr, list)
mock_on_stage.assert_called_once()
mock_load_metadata.assert_called_once()
assert detector_base.stopped is False
def test_pre_scan(detector_base):
with mock.patch.object(detector_base.custom_prepare, "on_pre_scan") as mock_on_pre_scan:
detector_base.pre_scan()
mock_on_pre_scan.assert_called_once()
def test_trigger(detector_base):
status = DeviceStatus(detector_base)
with mock.patch.object(
detector_base.custom_prepare, "on_trigger", side_effect=[None, status]
) as mock_on_trigger:
st = detector_base.trigger()
assert isinstance(st, DeviceStatus)
time.sleep(0.1)
assert st.done is True
st = detector_base.trigger()
assert st.done is False
assert id(st) == id(status)
def test_unstage(detector_base):
detector_base.stopped = True
with (
mock.patch.object(detector_base.custom_prepare, "on_unstage") as mock_on_unstage,
mock.patch.object(detector_base, "check_scan_id") as mock_check_scan_id,
):
rtr = detector_base.unstage()
assert isinstance(rtr, list)
assert mock_check_scan_id.call_count == 1
assert mock_on_unstage.call_count == 1
detector_base.stopped = False
rtr = detector_base.unstage()
assert isinstance(rtr, list)
assert mock_check_scan_id.call_count == 2
assert mock_on_unstage.call_count == 2
def test_complete(detector_base):
status = DeviceStatus(detector_base)
with mock.patch.object(
detector_base.custom_prepare, "on_complete", side_effect=[None, status]
) as mock_on_complete:
st = detector_base.complete()
assert isinstance(st, DeviceStatus)
time.sleep(0.1)
assert st.done is True
st = detector_base.complete()
assert st.done is False
assert id(st) == id(status)
def test_stop(detector_base):
with mock.patch.object(detector_base.custom_prepare, "on_stop") as mock_on_stop:
detector_base.stop()
mock_on_stop.assert_called_once()
assert detector_base.stopped is True
def test_check_scan_id(detector_base):
detector_base.scaninfo.scan_id = "abcde"
detector_base.stopped = False
detector_base.check_scan_id()
assert detector_base.stopped is True
detector_base.stopped = False
detector_base.check_scan_id()
assert detector_base.stopped is False
def test_wait_for_signal(detector_base):
my_value = False
def my_callback():
return my_value
detector_base
status = detector_base.custom_prepare.wait_with_status(
[(my_callback, True)],
check_stopped=True,
timeout=5,
interval=0.01,
exception_on_timeout=None,
)
time.sleep(0.1)
assert status.done is False
# Check first that it is stopped when detector_base.stop() is called
detector_base.stop()
# some delay to allow the stop to take effect
time.sleep(0.15)
assert status.done is True
assert status.exception().args == DeviceStopError(f"{detector_base.name} was stopped").args
detector_base.stopped = False
status = detector_base.custom_prepare.wait_with_status(
[(my_callback, True)],
check_stopped=True,
timeout=5,
interval=0.01,
exception_on_timeout=None,
)
# Check that thread resolves when expected value is set
my_value = True
# some delay to allow the stop to take effect
time.sleep(0.15)
assert status.done is True
assert status.success is True
assert status.exception() is None
detector_base.stopped = False
# Check that wait for status runs into timeout with expectd exception
my_value = "random_value"
exception = TimeoutError("Timeout")
status = detector_base.custom_prepare.wait_with_status(
[(my_callback, True)],
check_stopped=True,
timeout=0.01,
interval=0.01,
exception_on_timeout=exception,
)
time.sleep(0.2)
assert status.done is True
assert id(status.exception()) == id(exception)
assert status.success is False
def test_wait_for_signal_returns_exception(detector_base):
my_value = False
def my_callback():
return my_value
# Check that wait for status runs into timeout with expectd exception
exception = TimeoutError("Timeout")
status = detector_base.custom_prepare.wait_with_status(
[(my_callback, True)],
check_stopped=True,
timeout=0.01,
interval=0.01,
exception_on_timeout=exception,
)
time.sleep(0.2)
assert status.done is True
assert id(status.exception()) == id(exception)
assert status.success is False
detector_base.stopped = False
# Check that standard exception is thrown
status = detector_base.custom_prepare.wait_with_status(
[(my_callback, True)],
check_stopped=True,
timeout=0.01,
interval=0.01,
exception_on_timeout=None,
)
time.sleep(0.2)
assert status.done is True
assert (
status.exception().args
== DeviceTimeoutError(
f"Timeout error for {detector_base.name} while waiting for signals {[(my_callback, True)]}"
).args
)
assert status.success is False