mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-06-12 14:27:14 +02:00
feat: add option to return DeviceStatus for on_trigger, on_complete; extend wait_for_signals
This commit is contained in:
@ -1,4 +1,5 @@
|
||||
# pylint: skip-file
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
@ -49,10 +50,17 @@ def test_pre_scan(detector_base):
|
||||
|
||||
|
||||
def test_trigger(detector_base):
|
||||
with mock.patch.object(detector_base.custom_prepare, "on_trigger") as mock_on_trigger:
|
||||
rtr = detector_base.trigger()
|
||||
assert isinstance(rtr, DeviceStatus)
|
||||
mock_on_trigger.assert_called_once()
|
||||
status = DeviceStatus(detector_base)
|
||||
with mock.patch.object(
|
||||
detector_base.custom_prepare, "on_trigger", side_effect=[None, status]
|
||||
) as mock_on_trigger:
|
||||
st = detector_base.trigger()
|
||||
assert isinstance(st, DeviceStatus)
|
||||
time.sleep(0.1)
|
||||
assert st.done is True
|
||||
st = detector_base.trigger()
|
||||
assert st.done is False
|
||||
assert id(st) == id(status)
|
||||
|
||||
|
||||
def test_unstage(detector_base):
|
||||
@ -74,9 +82,17 @@ def test_unstage(detector_base):
|
||||
|
||||
|
||||
def test_complete(detector_base):
|
||||
with mock.patch.object(detector_base.custom_prepare, "on_complete") as mock_on_complete:
|
||||
detector_base.complete()
|
||||
mock_on_complete.assert_called_once()
|
||||
status = DeviceStatus(detector_base)
|
||||
with mock.patch.object(
|
||||
detector_base.custom_prepare, "on_complete", side_effect=[None, status]
|
||||
) as mock_on_complete:
|
||||
st = detector_base.complete()
|
||||
assert isinstance(st, DeviceStatus)
|
||||
time.sleep(0.1)
|
||||
assert st.done is True
|
||||
st = detector_base.complete()
|
||||
assert st.done is False
|
||||
assert id(st) == id(status)
|
||||
|
||||
|
||||
def test_stop(detector_base):
|
||||
@ -94,3 +110,38 @@ def test_check_scan_id(detector_base):
|
||||
detector_base.stopped = False
|
||||
detector_base.check_scan_id()
|
||||
assert detector_base.stopped is False
|
||||
|
||||
|
||||
def test_wait_for_signal(detector_base):
|
||||
expected_value = "test"
|
||||
exception = TimeoutError("Timeout")
|
||||
status = detector_base.custom_prepare.wait_with_status(
|
||||
[(detector_base.filepath.get, expected_value)],
|
||||
check_stopped=True,
|
||||
timeout=5,
|
||||
interval=0.01,
|
||||
exception_on_timeout=exception,
|
||||
)
|
||||
time.sleep(0.1)
|
||||
assert status.done is False
|
||||
# Check first that it is stopped when detector_base.stop() is called
|
||||
detector_base.stop()
|
||||
# some delay to allow the stop to take effect
|
||||
time.sleep(0.15)
|
||||
assert status.done is True
|
||||
assert id(status.exception()) == id(exception)
|
||||
detector_base.stopped = False
|
||||
status = detector_base.custom_prepare.wait_with_status(
|
||||
[(detector_base.filepath.get, expected_value)],
|
||||
check_stopped=True,
|
||||
timeout=5,
|
||||
interval=0.01,
|
||||
exception_on_timeout=exception,
|
||||
)
|
||||
# Check that thread resolves when expected value is set
|
||||
detector_base.filepath.set(expected_value)
|
||||
# some delay to allow the stop to take effect
|
||||
time.sleep(0.15)
|
||||
assert status.done is True
|
||||
assert status.success is True
|
||||
assert status.exception() is None
|
||||
|
Reference in New Issue
Block a user