mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-02-20 17:28:42 +01:00
fix(status): Add wrappers for ophyd status objects to improve error handling
This commit is contained in:
@@ -12,6 +12,14 @@ from ophyd import Device, EpicsSignalRO, Signal
|
||||
from ophyd.status import WaitTimeoutError
|
||||
from typeguard import TypeCheckError
|
||||
|
||||
from ophyd_devices import (
|
||||
AndStatus,
|
||||
DeviceStatus,
|
||||
MoveStatus,
|
||||
Status,
|
||||
StatusBase,
|
||||
SubscriptionStatus,
|
||||
)
|
||||
from ophyd_devices.tests.utils import MockPV
|
||||
from ophyd_devices.utils.bec_signals import (
|
||||
AsyncMultiSignal,
|
||||
@@ -76,8 +84,8 @@ def test_utils_file_handler_has_full_path(file_handler):
|
||||
|
||||
def test_utils_task_status(device):
|
||||
"""Test TaskStatus creation"""
|
||||
status = TaskStatus(device=device)
|
||||
assert status.device.name == "device"
|
||||
status = TaskStatus(device)
|
||||
assert status.obj.name == "device"
|
||||
assert status.state == "not_started"
|
||||
assert status.task_id == status._task_id
|
||||
status.state = "running"
|
||||
@@ -929,3 +937,68 @@ def test_transition_status_with_mock_pv(
|
||||
status.wait(timeout=1)
|
||||
assert status.done is False
|
||||
assert status.success is False
|
||||
|
||||
|
||||
def test_patched_status_objects():
|
||||
"""Test the patched Status objects in ophyd_devices that improve error handling."""
|
||||
|
||||
# StatusBase & AndStatus
|
||||
st = StatusBase()
|
||||
st2 = StatusBase()
|
||||
and_st = st & st2
|
||||
assert isinstance(and_st, AndStatus)
|
||||
st.set_exception(ValueError("test error"))
|
||||
with pytest.raises(ValueError):
|
||||
and_st.wait(timeout=10)
|
||||
|
||||
# DeviceStatus & StatusBase
|
||||
dev = Device(name="device")
|
||||
dev_status = DeviceStatus(device=dev)
|
||||
assert dev_status.device == dev
|
||||
dev_status.set_exception(RuntimeError("device error"))
|
||||
|
||||
# Combine DeviceStatus with StatusBase and form AndStatus
|
||||
st = StatusBase(obj=dev)
|
||||
assert st.obj == dev
|
||||
dev_st = DeviceStatus(device=dev)
|
||||
combined_st = st & dev_st
|
||||
st.set_finished()
|
||||
dev_st.set_exception(RuntimeError("combined error"))
|
||||
with pytest.raises(RuntimeError):
|
||||
combined_st.wait(timeout=10)
|
||||
|
||||
# SubscriptionStatus
|
||||
sig = Signal(name="test_signal", value=0)
|
||||
|
||||
def _cb(*args, **kwargs):
|
||||
pass
|
||||
|
||||
sub_st = SubscriptionStatus(sig, callback=_cb)
|
||||
sub_st.set_exception(ValueError("subscription error"))
|
||||
with pytest.raises(ValueError):
|
||||
sub_st.wait(timeout=10)
|
||||
assert sub_st.done is True
|
||||
assert sub_st.success is False
|
||||
|
||||
# MoveStatus, here the default for call_stop_on_failure is True
|
||||
class Positioner(Device):
|
||||
SUB_READBACK = "readback"
|
||||
setpoint = Signal(name="setpoint", value=0)
|
||||
readback = Signal(name="readback", value=0)
|
||||
|
||||
@property
|
||||
def position(self):
|
||||
return self.readback.get()
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
pos = Positioner(name="positioner")
|
||||
move_st = MoveStatus(pos, target=10)
|
||||
with mock.patch.object(pos, "stop") as mock_stop:
|
||||
move_st.set_exception(RuntimeError("move error"))
|
||||
mock_stop.assert_called_once()
|
||||
with pytest.raises(RuntimeError):
|
||||
move_st.wait(timeout=10)
|
||||
assert move_st.done is True
|
||||
assert move_st.success is False
|
||||
|
||||
Reference in New Issue
Block a user