feat(ophyd): temporary until new Ophyd release, prevent Status objects threads

Monkey-patching of Ophyd library
This commit is contained in:
2024-03-18 15:32:12 +01:00
committed by guijar_m
parent c9d1e0db05
commit df8ce79ca0
3 changed files with 105 additions and 0 deletions

View File

@ -0,0 +1,51 @@
import pytest
import threading
import time
from unittest.mock import Mock
import ophyd_devices # ensure we are patched
from ophyd.status import StatusBase, StatusTimeoutError
def test_ophyd_status_patch():
cb = Mock()
st = StatusBase(timeout=1)
assert isinstance(st._callback_thread, threading.Thread)
st.add_callback(cb)
with pytest.raises(StatusTimeoutError):
time.sleep(1.1)
st.wait()
cb.assert_called_once()
cb.reset_mock()
st = StatusBase()
assert isinstance(st._callback_thread, Mock)
st.add_callback(cb)
st.set_finished()
cb.assert_called_once()
cb.reset_mock()
st.wait()
st = StatusBase(settle_time=1)
st.add_callback(cb)
assert isinstance(st._callback_thread, Mock)
st.set_finished()
assert cb.call_count == 0
time.sleep(0.5)
assert cb.call_count == 0 # not yet!
time.sleep(0.6)
cb.assert_called_once()
cb.reset_mock()
st.wait()
class TestException(RuntimeError):
pass
st = StatusBase()
st.add_callback(cb)
st.set_exception(TestException())
cb.assert_called_once()
with pytest.raises(TestException):
st.wait()