ophyd_devices/tests/test_base_classes.py

221 lines
7.2 KiB
Python

# pylint: skip-file
import time
from unittest import mock
import pytest
from ophyd import DeviceStatus, Staged
from ophyd.utils.errors import RedundantStaging
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
@pytest.fixture
def detector_base():
yield PSIDeviceBase(name="test_detector")
def test_detector_base_init(detector_base):
assert detector_base.stopped is False
assert detector_base.name == "test_detector"
assert detector_base.staged == Staged.no
assert detector_base.destroyed == False
def test_stage(detector_base):
assert detector_base._staged == Staged.no
assert detector_base.stopped is False
detector_base._staged = Staged.no
with mock.patch.object(detector_base, "on_stage") as mock_on_stage:
rtr = detector_base.stage()
assert isinstance(rtr, list)
assert mock_on_stage.called is True
with pytest.raises(RedundantStaging):
detector_base.stage()
detector_base._staged = Staged.no
detector_base.stopped = True
detector_base.stage()
assert detector_base.stopped is False
assert mock_on_stage.call_count == 2
# def test_stage(detector_base):
# detector_base._staged = Staged.yes
# with pytest.raises(RedundantStaging):
# detector_base.stage()
# assert detector_base.stopped is False
# detector_base._staged = Staged.no
# with (
# mock.patch.object(detector_base.custom_prepare, "on_stage") as mock_on_stage,
# mock.patch.object(detector_base.scaninfo, "load_scan_metadata") as mock_load_metadata,
# ):
# rtr = detector_base.stage()
# assert isinstance(rtr, list)
# mock_on_stage.assert_called_once()
# mock_load_metadata.assert_called_once()
# assert detector_base.stopped is False
# def test_pre_scan(detector_base):
# with mock.patch.object(detector_base.custom_prepare, "on_pre_scan") as mock_on_pre_scan:
# detector_base.pre_scan()
# mock_on_pre_scan.assert_called_once()
# def test_trigger(detector_base):
# status = DeviceStatus(detector_base)
# with mock.patch.object(
# detector_base.custom_prepare, "on_trigger", side_effect=[None, status]
# ) as mock_on_trigger:
# st = detector_base.trigger()
# assert isinstance(st, DeviceStatus)
# time.sleep(0.1)
# assert st.done is True
# st = detector_base.trigger()
# assert st.done is False
# assert id(st) == id(status)
# def test_unstage(detector_base):
# detector_base.stopped = True
# with (
# mock.patch.object(detector_base.custom_prepare, "on_unstage") as mock_on_unstage,
# mock.patch.object(detector_base, "check_scan_id") as mock_check_scan_id,
# ):
# rtr = detector_base.unstage()
# assert isinstance(rtr, list)
# assert mock_check_scan_id.call_count == 1
# assert mock_on_unstage.call_count == 1
# detector_base.stopped = False
# rtr = detector_base.unstage()
# assert isinstance(rtr, list)
# assert mock_check_scan_id.call_count == 2
# assert mock_on_unstage.call_count == 2
# def test_complete(detector_base):
# status = DeviceStatus(detector_base)
# with mock.patch.object(
# detector_base.custom_prepare, "on_complete", side_effect=[None, status]
# ) as mock_on_complete:
# st = detector_base.complete()
# assert isinstance(st, DeviceStatus)
# time.sleep(0.1)
# assert st.done is True
# st = detector_base.complete()
# assert st.done is False
# assert id(st) == id(status)
# def test_stop(detector_base):
# with mock.patch.object(detector_base.custom_prepare, "on_stop") as mock_on_stop:
# detector_base.stop()
# mock_on_stop.assert_called_once()
# assert detector_base.stopped is True
# def test_check_scan_id(detector_base):
# detector_base.scaninfo.scan_id = "abcde"
# detector_base.stopped = False
# detector_base.check_scan_id()
# assert detector_base.stopped is True
# detector_base.stopped = False
# detector_base.check_scan_id()
# assert detector_base.stopped is False
# def test_wait_for_signal(detector_base):
# my_value = False
# def my_callback():
# return my_value
# detector_base
# status = detector_base.custom_prepare.wait_with_status(
# [(my_callback, True)],
# check_stopped=True,
# timeout=5,
# interval=0.01,
# exception_on_timeout=None,
# )
# time.sleep(0.1)
# assert status.done is False
# # Check first that it is stopped when detector_base.stop() is called
# detector_base.stop()
# # some delay to allow the stop to take effect
# time.sleep(0.15)
# assert status.done is True
# assert status.exception().args == DeviceStopError(f"{detector_base.name} was stopped").args
# detector_base.stopped = False
# status = detector_base.custom_prepare.wait_with_status(
# [(my_callback, True)],
# check_stopped=True,
# timeout=5,
# interval=0.01,
# exception_on_timeout=None,
# )
# # Check that thread resolves when expected value is set
# my_value = True
# # some delay to allow the stop to take effect
# time.sleep(0.15)
# assert status.done is True
# assert status.success is True
# assert status.exception() is None
# detector_base.stopped = False
# # Check that wait for status runs into timeout with expectd exception
# my_value = "random_value"
# exception = TimeoutError("Timeout")
# status = detector_base.custom_prepare.wait_with_status(
# [(my_callback, True)],
# check_stopped=True,
# timeout=0.01,
# interval=0.01,
# exception_on_timeout=exception,
# )
# time.sleep(0.2)
# assert status.done is True
# assert id(status.exception()) == id(exception)
# assert status.success is False
# def test_wait_for_signal_returns_exception(detector_base):
# my_value = False
# def my_callback():
# return my_value
# # Check that wait for status runs into timeout with expectd exception
# exception = TimeoutError("Timeout")
# status = detector_base.custom_prepare.wait_with_status(
# [(my_callback, True)],
# check_stopped=True,
# timeout=0.01,
# interval=0.01,
# exception_on_timeout=exception,
# )
# time.sleep(0.2)
# assert status.done is True
# assert id(status.exception()) == id(exception)
# assert status.success is False
# detector_base.stopped = False
# # Check that standard exception is thrown
# status = detector_base.custom_prepare.wait_with_status(
# [(my_callback, True)],
# check_stopped=True,
# timeout=0.01,
# interval=0.01,
# exception_on_timeout=None,
# )
# time.sleep(0.2)
# assert status.done is True
# assert (
# status.exception().args
# == DeviceTimeoutError(
# f"Timeout error for {detector_base.name} while waiting for signals {[(my_callback, True)]}"
# ).args
# )
# assert status.success is False