# pylint: skip-file import time from unittest import mock import pytest from ophyd import DeviceStatus, Staged from ophyd.utils.errors import RedundantStaging from ophyd_devices.interfaces.base_classes.bec_device_base import BECDeviceBase, CustomPrepare from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError @pytest.fixture def detector_base(): yield BECDeviceBase(name="test_detector") def test_detector_base_init(detector_base): assert detector_base.stopped is False assert detector_base.name == "test_detector" assert "base_path" in detector_base.filewriter.service_config assert isinstance(detector_base.scaninfo, BecScaninfoMixin) assert issubclass(detector_base.custom_prepare_cls, CustomPrepare) def test_stage(detector_base): detector_base._staged = Staged.yes with pytest.raises(RedundantStaging): detector_base.stage() assert detector_base.stopped is False detector_base._staged = Staged.no with ( mock.patch.object(detector_base.custom_prepare, "on_stage") as mock_on_stage, mock.patch.object(detector_base.scaninfo, "load_scan_metadata") as mock_load_metadata, ): rtr = detector_base.stage() assert isinstance(rtr, list) mock_on_stage.assert_called_once() mock_load_metadata.assert_called_once() assert detector_base.stopped is False def test_pre_scan(detector_base): with mock.patch.object(detector_base.custom_prepare, "on_pre_scan") as mock_on_pre_scan: detector_base.pre_scan() mock_on_pre_scan.assert_called_once() def test_trigger(detector_base): status = DeviceStatus(detector_base) with mock.patch.object( detector_base.custom_prepare, "on_trigger", side_effect=[None, status] ) as mock_on_trigger: st = detector_base.trigger() assert isinstance(st, DeviceStatus) time.sleep(0.1) assert st.done is True st = detector_base.trigger() assert st.done is False assert id(st) == id(status) def test_unstage(detector_base): detector_base.stopped = True with ( mock.patch.object(detector_base.custom_prepare, "on_unstage") as mock_on_unstage, mock.patch.object(detector_base, "check_scan_id") as mock_check_scan_id, ): rtr = detector_base.unstage() assert isinstance(rtr, list) assert mock_check_scan_id.call_count == 1 assert mock_on_unstage.call_count == 1 detector_base.stopped = False rtr = detector_base.unstage() assert isinstance(rtr, list) assert mock_check_scan_id.call_count == 2 assert mock_on_unstage.call_count == 2 def test_complete(detector_base): status = DeviceStatus(detector_base) with mock.patch.object( detector_base.custom_prepare, "on_complete", side_effect=[None, status] ) as mock_on_complete: st = detector_base.complete() assert isinstance(st, DeviceStatus) time.sleep(0.1) assert st.done is True st = detector_base.complete() assert st.done is False assert id(st) == id(status) def test_stop(detector_base): with mock.patch.object(detector_base.custom_prepare, "on_stop") as mock_on_stop: detector_base.stop() mock_on_stop.assert_called_once() assert detector_base.stopped is True def test_check_scan_id(detector_base): detector_base.scaninfo.scan_id = "abcde" detector_base.stopped = False detector_base.check_scan_id() assert detector_base.stopped is True detector_base.stopped = False detector_base.check_scan_id() assert detector_base.stopped is False def test_wait_for_signal(detector_base): my_value = False def my_callback(): return my_value detector_base status = detector_base.custom_prepare.wait_with_status( [(my_callback, True)], check_stopped=True, timeout=5, interval=0.01, exception_on_timeout=None, ) time.sleep(0.1) assert status.done is False # Check first that it is stopped when detector_base.stop() is called detector_base.stop() # some delay to allow the stop to take effect time.sleep(0.15) assert status.done is True assert status.exception().args == DeviceStopError(f"{detector_base.name} was stopped").args detector_base.stopped = False status = detector_base.custom_prepare.wait_with_status( [(my_callback, True)], check_stopped=True, timeout=5, interval=0.01, exception_on_timeout=None, ) # Check that thread resolves when expected value is set my_value = True # some delay to allow the stop to take effect time.sleep(0.15) assert status.done is True assert status.success is True assert status.exception() is None detector_base.stopped = False # Check that wait for status runs into timeout with expectd exception my_value = "random_value" exception = TimeoutError("Timeout") status = detector_base.custom_prepare.wait_with_status( [(my_callback, True)], check_stopped=True, timeout=0.01, interval=0.01, exception_on_timeout=exception, ) time.sleep(0.2) assert status.done is True assert id(status.exception()) == id(exception) assert status.success is False def test_wait_for_signal_returns_exception(detector_base): my_value = False def my_callback(): return my_value # Check that wait for status runs into timeout with expectd exception exception = TimeoutError("Timeout") status = detector_base.custom_prepare.wait_with_status( [(my_callback, True)], check_stopped=True, timeout=0.01, interval=0.01, exception_on_timeout=exception, ) time.sleep(0.2) assert status.done is True assert id(status.exception()) == id(exception) assert status.success is False detector_base.stopped = False # Check that standard exception is thrown status = detector_base.custom_prepare.wait_with_status( [(my_callback, True)], check_stopped=True, timeout=0.01, interval=0.01, exception_on_timeout=None, ) time.sleep(0.2) assert status.done is True assert ( status.exception().args == DeviceTimeoutError( f"Timeout error for {detector_base.name} while waiting for signals {[(my_callback, True)]}" ).args ) assert status.success is False