test: update tests

This commit is contained in:
2024-12-04 15:48:18 +01:00
parent 8f51789f5b
commit a1da3a5f40
3 changed files with 40 additions and 28 deletions

View File

@ -6,10 +6,8 @@ import pytest
from ophyd import DeviceStatus, Staged
from ophyd.utils.errors import RedundantStaging
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase
from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
@ -24,7 +22,7 @@ def test_detector_base_init(detector_base):
assert detector_base.name == "test_detector"
assert "base_path" in detector_base.filewriter.service_config
assert isinstance(detector_base.scaninfo, BecScaninfoMixin)
assert issubclass(detector_base.custom_prepare_cls, CustomDetectorMixin)
assert issubclass(detector_base.custom_prepare_cls, CustomPrepare)
def test_stage(detector_base):
@ -113,9 +111,14 @@ def test_check_scan_id(detector_base):
def test_wait_for_signal(detector_base):
expected_value = "test"
my_value = False
def my_callback():
return my_value
detector_base
status = detector_base.custom_prepare.wait_with_status(
[(detector_base.filepath.get, expected_value)],
[(my_callback, True)],
check_stopped=True,
timeout=5,
interval=0.01,
@ -131,14 +134,14 @@ def test_wait_for_signal(detector_base):
assert status.exception().args == DeviceStopError(f"{detector_base.name} was stopped").args
detector_base.stopped = False
status = detector_base.custom_prepare.wait_with_status(
[(detector_base.filepath.get, expected_value)],
[(my_callback, True)],
check_stopped=True,
timeout=5,
interval=0.01,
exception_on_timeout=None,
)
# Check that thread resolves when expected value is set
detector_base.filepath.set(expected_value)
my_value = True
# some delay to allow the stop to take effect
time.sleep(0.15)
assert status.done is True
@ -147,12 +150,10 @@ def test_wait_for_signal(detector_base):
detector_base.stopped = False
# Check that wait for status runs into timeout with expectd exception
st = detector_base.filepath.set("wrong_value")
st.wait()
assert detector_base.filepath.get() == "wrong_value"
my_value = "random_value"
exception = TimeoutError("Timeout")
status = detector_base.custom_prepare.wait_with_status(
[(detector_base.filepath.get, expected_value)],
[(my_callback, True)],
check_stopped=True,
timeout=0.01,
interval=0.01,
@ -165,12 +166,16 @@ def test_wait_for_signal(detector_base):
def test_wait_for_signal_returns_exception(detector_base):
expected_value = "test"
my_value = False
def my_callback():
return my_value
# Check that wait for status runs into timeout with expectd exception
detector_base.filepath.set("wrong_value")
exception = TimeoutError("Timeout")
status = detector_base.custom_prepare.wait_with_status(
[(detector_base.filepath.get, expected_value)],
[(my_callback, True)],
check_stopped=True,
timeout=0.01,
interval=0.01,
@ -184,7 +189,7 @@ def test_wait_for_signal_returns_exception(detector_base):
detector_base.stopped = False
# Check that standard exception is thrown
status = detector_base.custom_prepare.wait_with_status(
[(detector_base.filepath.get, expected_value)],
[(my_callback, True)],
check_stopped=True,
timeout=0.01,
interval=0.01,
@ -195,7 +200,7 @@ def test_wait_for_signal_returns_exception(detector_base):
assert (
status.exception().args
== DeviceTimeoutError(
f"Timeout error for {detector_base.name} while waiting for signals {[(detector_base.filepath.get, expected_value)]}"
f"Timeout error for {detector_base.name} while waiting for signals {[(my_callback, True)]}"
).args
)
assert status.success is False