mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-12-27 07:31:33 +01:00
1024 lines
32 KiB
Python
1024 lines
32 KiB
Python
import threading
|
|
import time
|
|
from functools import partial
|
|
from unittest import mock
|
|
|
|
import numpy as np
|
|
import ophyd
|
|
import pytest
|
|
from bec_lib import messages
|
|
from ophyd import Component as Cpt
|
|
from ophyd import Device, EpicsSignalRO, Signal
|
|
from ophyd.status import WaitTimeoutError
|
|
from typeguard import TypeCheckError
|
|
|
|
from ophyd_devices import (
|
|
AndStatus,
|
|
DeviceStatus,
|
|
MoveStatus,
|
|
Status,
|
|
StatusBase,
|
|
SubscriptionStatus,
|
|
)
|
|
from ophyd_devices.tests.utils import MockPV
|
|
from ophyd_devices.utils.bec_signals import (
|
|
AsyncMultiSignal,
|
|
AsyncSignal,
|
|
BECMessageSignal,
|
|
DynamicSignal,
|
|
FileEventSignal,
|
|
PreviewSignal,
|
|
ProgressSignal,
|
|
)
|
|
from ophyd_devices.utils.psi_device_base_utils import (
|
|
CompareStatus,
|
|
FileHandler,
|
|
TaskHandler,
|
|
TaskKilledError,
|
|
TaskState,
|
|
TaskStatus,
|
|
TransitionStatus,
|
|
)
|
|
|
|
# pylint: disable=protected-access
|
|
# pylint: disable=redefined-outer-name
|
|
|
|
##########################################
|
|
######### Test Task Handler ############
|
|
##########################################
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def mock_epics_signal_ro():
|
|
name = "epics_signal_ro"
|
|
read_pv = "TEST:EPICS_SIGNAL_RO"
|
|
with mock.patch.object(ophyd, "cl") as mock_cl:
|
|
mock_cl.get_pv = MockPV
|
|
mock_cl.thread_class = threading.Thread
|
|
dev = EpicsSignalRO(name=name, read_pv=read_pv)
|
|
yield dev
|
|
|
|
|
|
@pytest.fixture
|
|
def file_handler():
|
|
"""Fixture for FileHandler"""
|
|
yield FileHandler()
|
|
|
|
|
|
@pytest.fixture
|
|
def device():
|
|
"""Fixture for Device"""
|
|
yield Device(name="device")
|
|
|
|
|
|
@pytest.fixture
|
|
def task_handler(device):
|
|
"""Fixture for TaskHandler"""
|
|
yield TaskHandler(parent=device)
|
|
|
|
|
|
def test_utils_file_handler_has_full_path(file_handler):
|
|
"""Ensure that file_handler has a get_full_path method"""
|
|
assert hasattr(file_handler, "get_full_path")
|
|
|
|
|
|
def test_utils_task_status(device):
|
|
"""Test TaskStatus creation"""
|
|
status = TaskStatus(device)
|
|
assert status.obj.name == "device"
|
|
assert status.state == "not_started"
|
|
assert status.task_id == status._task_id
|
|
status.state = "running"
|
|
assert status.state == TaskState.RUNNING
|
|
status.state = TaskState.COMPLETED
|
|
assert status.state == "completed"
|
|
|
|
|
|
def test_utils_task_handler_submit_task_with_args(task_handler):
|
|
"""Ensure that task_handler has a submit_task method"""
|
|
|
|
def my_task(input_arg: bool, input_kwarg: bool = False):
|
|
if input_kwarg is True:
|
|
raise ValueError("input_kwarg is True")
|
|
if input_arg is True:
|
|
return True
|
|
return False
|
|
|
|
# This should fail
|
|
with pytest.raises(TypeError):
|
|
status = task_handler.submit_task(my_task)
|
|
status.wait()
|
|
# This should pass
|
|
|
|
task_stopped = threading.Event()
|
|
|
|
def finished_cb():
|
|
task_stopped.set()
|
|
|
|
status = task_handler.submit_task(
|
|
my_task, task_args=(True,), task_kwargs={"input_kwarg": False}
|
|
)
|
|
status.add_callback(finished_cb)
|
|
task_stopped.wait()
|
|
assert status.done is True
|
|
assert status.state == TaskState.COMPLETED
|
|
# This should fail
|
|
task_stopped = threading.Event()
|
|
status = task_handler.submit_task(my_task, task_args=(True,), task_kwargs={"input_kwarg": True})
|
|
with pytest.raises(ValueError):
|
|
status.wait()
|
|
assert status.state == TaskState.ERROR
|
|
assert status.done is True
|
|
assert status.exception().__class__ == ValueError
|
|
|
|
|
|
@pytest.mark.timeout(100)
|
|
def test_utils_task_handler_task_killed(task_handler):
|
|
"""Ensure that task_handler has a submit_task method"""
|
|
# No tasks should be running
|
|
assert len(task_handler._tasks) == 0
|
|
event = threading.Event()
|
|
task_stopped = threading.Event()
|
|
task_started = threading.Event()
|
|
|
|
def finished_cb():
|
|
task_stopped.set()
|
|
|
|
def my_wait_task():
|
|
task_started.set()
|
|
for _ in range(100):
|
|
event.wait(timeout=0.1)
|
|
|
|
# Create task
|
|
status = task_handler.submit_task(my_wait_task, run=False)
|
|
status.add_callback(finished_cb)
|
|
assert status.state == TaskState.NOT_STARTED
|
|
# Start task
|
|
task_handler.start_task(status)
|
|
task_started.wait()
|
|
assert status.state == TaskState.RUNNING
|
|
# Stop task
|
|
task_handler.kill_task(status)
|
|
task_stopped.wait()
|
|
assert status.state == TaskState.KILLED
|
|
assert status.exception().__class__ == TaskKilledError
|
|
|
|
|
|
@pytest.mark.timeout(100)
|
|
def test_utils_task_handler_task_successful(task_handler):
|
|
"""Ensure that the task handler runs a successful task"""
|
|
assert len(task_handler._tasks) == 0
|
|
event = threading.Event()
|
|
task_stopped = threading.Event()
|
|
task_started = threading.Event()
|
|
|
|
def finished_cb():
|
|
task_stopped.set()
|
|
|
|
def my_wait_task():
|
|
task_started.set()
|
|
for _ in range(100):
|
|
ret = event.wait(timeout=0.1)
|
|
if ret is True:
|
|
break
|
|
|
|
status = task_handler.submit_task(my_wait_task, run=False)
|
|
status.add_callback(finished_cb)
|
|
task_handler.start_task(status)
|
|
task_started.wait()
|
|
assert status.state == TaskState.RUNNING
|
|
event.set()
|
|
task_stopped.wait()
|
|
assert status.state == TaskState.COMPLETED
|
|
|
|
|
|
def test_utils_task_handler_shutdown(task_handler):
|
|
"""Test to shutdown the handler"""
|
|
|
|
task_completed_cb1 = threading.Event()
|
|
task_completed_cb2 = threading.Event()
|
|
|
|
def finished_cb1():
|
|
task_completed_cb1.set()
|
|
|
|
def finished_cb2():
|
|
task_completed_cb2.set()
|
|
|
|
def cb1():
|
|
for _ in range(1000):
|
|
time.sleep(0.2)
|
|
|
|
def cb2():
|
|
for _ in range(1000):
|
|
time.sleep(0.2)
|
|
|
|
status1 = task_handler.submit_task(cb1)
|
|
status1.add_callback(finished_cb1)
|
|
status2 = task_handler.submit_task(cb2)
|
|
status2.add_callback(finished_cb2)
|
|
assert len(task_handler._tasks) == 2
|
|
assert status1.state == TaskState.RUNNING
|
|
assert status2.state == TaskState.RUNNING
|
|
task_handler.shutdown()
|
|
task_completed_cb1.wait()
|
|
task_completed_cb2.wait()
|
|
assert len(task_handler._tasks) == 0
|
|
assert status1.state == TaskState.KILLED
|
|
assert status2.state == TaskState.KILLED
|
|
assert status1.exception().__class__ == TaskKilledError
|
|
|
|
|
|
##########################################
|
|
######### Test PSI cusomt signals ######
|
|
##########################################
|
|
|
|
|
|
def test_utils_bec_message_signal():
|
|
"""Test BECMessageSignal"""
|
|
dev = Device(name="device")
|
|
signal = BECMessageSignal(
|
|
name="bec_message_signal",
|
|
bec_message_type=messages.GUIInstructionMessage,
|
|
value=None,
|
|
parent=dev,
|
|
)
|
|
assert signal.parent == dev
|
|
assert signal._bec_message_type == messages.GUIInstructionMessage
|
|
assert signal._readback is None
|
|
assert signal.name == "bec_message_signal"
|
|
assert signal.describe() == {
|
|
"bec_message_signal": {
|
|
"source": "BECMessageSignal:bec_message_signal",
|
|
"dtype": "GUIInstructionMessage",
|
|
"shape": [],
|
|
"signal_info": {
|
|
"data_type": "raw",
|
|
"saved": True,
|
|
"ndim": 0,
|
|
"scope": "scan",
|
|
"role": "main",
|
|
"enabled": True,
|
|
"rpc_access": False,
|
|
"signals": [("bec_message_signal", 5)],
|
|
"signal_metadata": {},
|
|
"acquisition_group": None,
|
|
},
|
|
}
|
|
}
|
|
# Put works with Message
|
|
msg = messages.GUIInstructionMessage(action="image", parameter={"gui_id": "test"})
|
|
signal.put(msg)
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"] == msg
|
|
# set works with dict, should call put
|
|
msg_dict = {"action": "image", "parameter": {"gui_id": "test"}}
|
|
status = signal.set(msg_dict)
|
|
assert status.done is True
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"] == msg
|
|
# Put fails with wrong type
|
|
with pytest.raises(ValueError):
|
|
signal.put("wrong_type")
|
|
# Put fails with wrong dict
|
|
with pytest.raises(ValueError):
|
|
signal.put({"wrong_key": "wrong_value"})
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"input_msg, output_msg",
|
|
[
|
|
(
|
|
messages.DeviceMessage(
|
|
signals={"sig1": {"value": 1}, "sig2": {"value": 2}}, metadata={"info": "test"}
|
|
),
|
|
messages.DeviceMessage(
|
|
signals={"device_data_sig1": {"value": 1}, "device_data_sig2": {"value": 2}},
|
|
metadata={"info": "test"},
|
|
),
|
|
),
|
|
(
|
|
messages.DeviceMessage(
|
|
signals={"device_data_sig1": {"value": 1}, "device_data_sig2": {"value": 2}},
|
|
metadata={"info": "test"},
|
|
),
|
|
messages.DeviceMessage(
|
|
signals={"device_data_sig1": {"value": 1}, "device_data_sig2": {"value": 2}},
|
|
metadata={"info": "test"},
|
|
),
|
|
),
|
|
],
|
|
)
|
|
def test_utils_signal_normalization(input_msg, output_msg):
|
|
"""Test signal normalization utility in BECMessageSignal"""
|
|
|
|
class DeviceWithSignal(Device):
|
|
data = Cpt(AsyncMultiSignal, name="data", signals=["sig1", "sig2"], ndim=0, max_size=1000)
|
|
|
|
dev = DeviceWithSignal(name="device")
|
|
dev.data._normalize_signals(input_msg)
|
|
assert input_msg == output_msg
|
|
|
|
|
|
def test_utils_dynamic_signal():
|
|
"""Test DynamicSignal"""
|
|
dev = Device(name="device")
|
|
signal = DynamicSignal(name="dynamic_signal", signals=["sig1", "sig2"], value=None, parent=dev)
|
|
assert signal.parent == dev
|
|
assert signal._bec_message_type == messages.DeviceMessage
|
|
assert signal._readback is None
|
|
assert signal.name == "dynamic_signal"
|
|
assert signal.signals == [("sig1", 1), ("sig2", 1)]
|
|
assert signal.describe() == {
|
|
"dynamic_signal": {
|
|
"source": "BECMessageSignal:dynamic_signal",
|
|
"dtype": "DeviceMessage",
|
|
"shape": [],
|
|
"signal_info": {
|
|
"data_type": "raw",
|
|
"saved": True,
|
|
"ndim": 1,
|
|
"scope": "scan",
|
|
"role": "main",
|
|
"enabled": True,
|
|
"rpc_access": False,
|
|
"signals": [("sig1", 1), ("sig2", 1)],
|
|
"signal_metadata": {},
|
|
"acquisition_group": None,
|
|
},
|
|
}
|
|
}
|
|
|
|
# Put works with Message
|
|
msg_dict = {"dynamic_signal_sig1": {"value": 1}, "dynamic_signal_sig2": {"value": 2}}
|
|
with pytest.raises(ValueError):
|
|
# Missing metadata
|
|
signal.put(messages.DeviceMessage(signals=msg_dict))
|
|
metadata = {"async_update": {"type": "add", "max_shape": [None, 1000]}}
|
|
msg = messages.DeviceMessage(signals=msg_dict, metadata=metadata)
|
|
signal.put(msg)
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"] == msg
|
|
# Set works with dict
|
|
status = signal.set(msg_dict, metadata=metadata)
|
|
assert status.done is True
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"] == msg
|
|
# Put fails with wrong type
|
|
with pytest.raises(TypeCheckError):
|
|
signal.put("wrong_type")
|
|
# Put fails with wrong dict
|
|
with pytest.raises(TypeCheckError):
|
|
signal.put({"wrong_key": "wrong_value"})
|
|
|
|
# Set with acquisition group
|
|
signal.put(msg, acquisition_group="fly-scan")
|
|
reading = signal.read()
|
|
msg.metadata["acquisition_group"] = "fly-scan"
|
|
assert reading[signal.name]["value"] == msg
|
|
|
|
|
|
def test_utils_dynamic_signal_with_defaults():
|
|
"""
|
|
Test DynamicSignal with async_update and acquisition group defaults. If only
|
|
one sub-signal is provided for the dynamic signal, the name of the sub-signal
|
|
will be used in the signals dict and a warning will be issued that the sub-signal
|
|
name is being ignored.
|
|
"""
|
|
dev = Device(name="device")
|
|
create_signal = partial(
|
|
DynamicSignal,
|
|
name="dynamic_signal",
|
|
parent=dev,
|
|
ndim=1,
|
|
value=None,
|
|
async_update={"type": "add", "max_shape": [None, 1000]},
|
|
acquisition_group="fly-scanning",
|
|
)
|
|
signal = create_signal(signals=["sig1", "sig2"])
|
|
val = np.random.random(1000)
|
|
msg_dict = {"dynamic_signal_sig1": {"value": val}}
|
|
signal.put(msg_dict)
|
|
reading = signal.read()
|
|
reading_value = reading[signal.name]["value"].model_dump(exclude={"timestamp"})
|
|
assert reading_value["signals"] == msg_dict
|
|
assert reading_value["metadata"]["async_update"] == {"type": "add", "max_shape": [None, 1000]}
|
|
assert reading_value["metadata"]["acquisition_group"] == "fly-scanning"
|
|
|
|
signal.put(msg_dict, acquisition_group="different-group")
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"].metadata["acquisition_group"] == "different-group"
|
|
|
|
# Test init variations for single signal
|
|
for signal in [["sig1"], "sig1", None]:
|
|
signal = create_signal(signals=signal)
|
|
assert signal.signals == [(signal.name, ophyd.Kind.hinted.value)]
|
|
|
|
|
|
def test_utils_async_multi_signal():
|
|
"""Test AsyncMultiSignal, which is a DynamicSignal with strict signal validation."""
|
|
device = Device(name="device")
|
|
signal = AsyncMultiSignal(
|
|
name="async_multi_signal",
|
|
ndim=1,
|
|
max_size=1000,
|
|
signals=["sig1", "sig2"],
|
|
async_update={"type": "add", "max_shape": [None, 1000]},
|
|
parent=device,
|
|
)
|
|
val = np.random.random(1000)
|
|
msg_dict = {"async_multi_signal_sig1": {"value": val}}
|
|
with pytest.raises(ValueError):
|
|
# Missing signal
|
|
signal.put(msg_dict)
|
|
msg_dict = {
|
|
"async_multi_signal_sig1": {"value": val},
|
|
"async_multi_signal_sig2": {"value": val},
|
|
}
|
|
signal.put(msg_dict)
|
|
reading = signal.read()
|
|
reading_value = reading[signal.name]["value"].model_dump(exclude={"timestamp"})
|
|
assert reading_value["signals"] == msg_dict
|
|
assert reading_value["metadata"]["async_update"] == {"type": "add", "max_shape": [None, 1000]}
|
|
|
|
|
|
def test_utils_async_signal():
|
|
device = Device(name="device")
|
|
signal = AsyncSignal(
|
|
name="async_signal",
|
|
ndim=1,
|
|
max_size=1000,
|
|
async_update={"type": "add", "max_shape": [None, 200]},
|
|
parent=device,
|
|
)
|
|
val = np.random.random(1000)
|
|
signal.put(
|
|
val,
|
|
async_update={"type": "add_slice", "max_shape": [None, 1000], "index": 1},
|
|
acquisition_group="scan",
|
|
)
|
|
reading = signal.read()
|
|
reading_value = reading[signal.name]["value"].model_dump(exclude={"timestamp"})
|
|
assert np.array_equal(reading_value["signals"][signal.name]["value"], val)
|
|
assert reading_value["metadata"]["async_update"] == {
|
|
"type": "add_slice",
|
|
"max_shape": [None, 1000],
|
|
"index": 1,
|
|
}
|
|
assert reading_value["metadata"]["acquisition_group"] == "scan"
|
|
|
|
|
|
def test_utils_file_event_signal():
|
|
"""Test FileEventSignal"""
|
|
dev = Device(name="device")
|
|
signal = FileEventSignal(name="file_event_signal", value=None, parent=dev)
|
|
assert signal.parent == dev
|
|
assert signal._bec_message_type == messages.FileMessage
|
|
assert signal._readback is None
|
|
assert signal.name == "file_event_signal"
|
|
assert signal.describe() == {
|
|
"file_event_signal": {
|
|
"source": "BECMessageSignal:file_event_signal",
|
|
"dtype": "FileMessage",
|
|
"shape": [],
|
|
"signal_info": {
|
|
"data_type": "raw",
|
|
"saved": False,
|
|
"ndim": 0,
|
|
"scope": "scan",
|
|
"role": "file event",
|
|
"enabled": True,
|
|
"rpc_access": False,
|
|
"signals": [("file_event_signal", 5)],
|
|
"signal_metadata": {},
|
|
"acquisition_group": None,
|
|
},
|
|
}
|
|
}
|
|
|
|
# Test put works with FileMessage
|
|
msg_dict = {"file_path": "/path/to/another/file.txt", "done": False, "successful": True}
|
|
msg = messages.FileMessage(**msg_dict)
|
|
signal.put(msg)
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"] == msg
|
|
# Test put works with dict
|
|
signal.put(msg_dict)
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"] == msg
|
|
# Test set with kwargs, should call put
|
|
status = signal.set(file_path="/path/to/another/file.txt", done=False, successful=True)
|
|
assert status.done is True
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"] == msg
|
|
# Test put fails with wrong type
|
|
with pytest.raises(ValueError):
|
|
signal.put(1)
|
|
# Test put fails with wrong dict
|
|
with pytest.raises(ValueError):
|
|
signal.put({"wrong_key": "wrong_value"})
|
|
|
|
|
|
def test_utils_preview_1d_signal():
|
|
"""Test Preview1DSignal"""
|
|
dev = Device(name="device")
|
|
signal = PreviewSignal(name="preview_1d_signal", ndim=1, value=None, parent=dev)
|
|
assert signal.ndim == 1
|
|
assert signal.parent == dev
|
|
assert signal._bec_message_type == messages.DevicePreviewMessage
|
|
assert signal._readback is None
|
|
assert signal.name == "preview_1d_signal"
|
|
assert signal.describe() == {
|
|
"preview_1d_signal": {
|
|
"source": "BECMessageSignal:preview_1d_signal",
|
|
"dtype": "DevicePreviewMessage",
|
|
"shape": [],
|
|
"signal_info": {
|
|
"data_type": "raw",
|
|
"saved": False,
|
|
"ndim": 1,
|
|
"scope": "scan",
|
|
"role": "preview",
|
|
"enabled": True,
|
|
"rpc_access": False,
|
|
"signals": [("preview_1d_signal", 5)],
|
|
"signal_metadata": {"num_rotation_90": 0, "transpose": False},
|
|
"acquisition_group": None,
|
|
},
|
|
}
|
|
}
|
|
# Put works with Message
|
|
msg_dict = {"device": dev.name, "data": np.array([1, 2, 3]), "signal": "preview_1d_signal"}
|
|
msg = messages.DevicePreviewMessage(**msg_dict)
|
|
signal.put(msg)
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump(
|
|
exclude="timestamp"
|
|
)
|
|
# Put works with dict
|
|
signal.put(msg_dict)
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump(
|
|
exclude="timestamp"
|
|
)
|
|
# Put works with value
|
|
status = signal.set(msg_dict["data"])
|
|
assert status.done is True
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump(
|
|
exclude="timestamp"
|
|
)
|
|
# Put works with value
|
|
signal.put(msg_dict["data"])
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump(
|
|
exclude="timestamp"
|
|
)
|
|
# Put fails with wrong type
|
|
with pytest.raises(ValueError):
|
|
signal.put(1)
|
|
# Put fails with wrong dict
|
|
with pytest.raises(ValueError):
|
|
signal.put({"wrong_key": "wrong_value"})
|
|
|
|
|
|
def test_utils_preview_2d_signal():
|
|
"""Test Preview2DSignal"""
|
|
dev = Device(name="device")
|
|
signal = PreviewSignal(name="preview_2d_signal", ndim=2, value=None, parent=dev)
|
|
assert signal.ndim == 2
|
|
assert signal.parent == dev
|
|
assert signal._bec_message_type == messages.DevicePreviewMessage
|
|
assert signal._readback is None
|
|
assert signal.name == "preview_2d_signal"
|
|
assert signal.describe() == {
|
|
"preview_2d_signal": {
|
|
"source": "BECMessageSignal:preview_2d_signal",
|
|
"dtype": "DevicePreviewMessage",
|
|
"shape": [],
|
|
"signal_info": {
|
|
"data_type": "raw",
|
|
"saved": False,
|
|
"ndim": 2,
|
|
"scope": "scan",
|
|
"role": "preview",
|
|
"enabled": True,
|
|
"rpc_access": False,
|
|
"signals": [("preview_2d_signal", 5)],
|
|
"signal_metadata": {"num_rotation_90": 0, "transpose": False},
|
|
"acquisition_group": None,
|
|
},
|
|
}
|
|
}
|
|
# Put works with Message
|
|
msg_dict = {
|
|
"device": dev.name,
|
|
"data": np.array([[1, 2, 3], [4, 5, 6]]),
|
|
"signal": "preview_2d_signal",
|
|
}
|
|
msg = messages.DevicePreviewMessage(**msg_dict)
|
|
signal.put(msg)
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump(
|
|
exclude="timestamp"
|
|
)
|
|
# Put works with dict
|
|
signal.put(msg_dict)
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump(
|
|
exclude="timestamp"
|
|
)
|
|
# Put works with value
|
|
status = signal.set(msg_dict["data"])
|
|
assert status.done is True
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump(
|
|
exclude="timestamp"
|
|
)
|
|
# Put works with value
|
|
signal.put(msg_dict["data"])
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"].model_dump(exclude="timestamp") == msg.model_dump(
|
|
exclude="timestamp"
|
|
)
|
|
# Put fails with wrong type
|
|
with pytest.raises(ValueError):
|
|
signal.put(1)
|
|
# Put fails with wrong dict
|
|
with pytest.raises(ValueError):
|
|
signal.put({"wrong_key": "wrong_value"})
|
|
|
|
|
|
def test_utils_progress_signal():
|
|
"""Test ProgressSignal"""
|
|
dev = Device(name="device")
|
|
signal = ProgressSignal(name="progress_signal", value=None, parent=dev)
|
|
assert signal.parent == dev
|
|
assert signal._bec_message_type == messages.ProgressMessage
|
|
assert signal._readback is None
|
|
assert signal.name == "progress_signal"
|
|
assert signal.describe() == {
|
|
"progress_signal": {
|
|
"source": "BECMessageSignal:progress_signal",
|
|
"dtype": "ProgressMessage",
|
|
"shape": [],
|
|
"signal_info": {
|
|
"data_type": "raw",
|
|
"saved": False,
|
|
"ndim": 0,
|
|
"scope": "scan",
|
|
"role": "progress",
|
|
"enabled": True,
|
|
"rpc_access": False,
|
|
"signals": [("progress_signal", 5)],
|
|
"signal_metadata": {},
|
|
"acquisition_group": None,
|
|
},
|
|
}
|
|
}
|
|
# Put works with Message
|
|
msg = messages.ProgressMessage(value=1, max_value=10, done=False)
|
|
signal.put(msg)
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"] == msg
|
|
# Put works with dict
|
|
msg_dict = {"value": 1, "max_value": 10, "done": False}
|
|
signal.put(msg_dict)
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"] == msg
|
|
# Works with kwargs
|
|
status = signal.set(value=1, max_value=10, done=False)
|
|
assert status.done is True
|
|
reading = signal.read()
|
|
assert reading[signal.name]["value"] == msg
|
|
# Put fails with wrong type
|
|
with pytest.raises(ValueError):
|
|
signal.put(1)
|
|
# Put fails with wrong dict
|
|
with pytest.raises(ValueError):
|
|
signal.put({"wrong_key": "wrong_value"})
|
|
|
|
|
|
def test_utils_compare_status_number():
|
|
"""Test CompareStatus with different operations."""
|
|
sig = Signal(name="test_signal", value=0)
|
|
status = CompareStatus(signal=sig, value=5, operation_success="==")
|
|
assert status.done is False
|
|
sig.put(1)
|
|
assert status.done is False
|
|
sig.put(5)
|
|
status.wait(timeout=5)
|
|
assert status.done is True
|
|
|
|
sig.put(5)
|
|
# Test with different operations
|
|
status = CompareStatus(signal=sig, value=5, operation_success="!=")
|
|
assert status.done is False
|
|
sig.put(5)
|
|
assert status.done is False
|
|
sig.put(6)
|
|
assert status.done is True
|
|
assert status.success is True
|
|
assert status.exception() is None
|
|
|
|
sig.put(0)
|
|
status = CompareStatus(signal=sig, value=5, operation_success=">")
|
|
assert status.done is False
|
|
sig.put(5)
|
|
assert status.done is False
|
|
sig.put(10)
|
|
assert status.done is True
|
|
assert status.success is True
|
|
assert status.exception() is None
|
|
|
|
# Should raise
|
|
sig.put(0)
|
|
status = CompareStatus(signal=sig, value=5, operation_success="==", failure_value=[10])
|
|
with pytest.raises(ValueError):
|
|
sig.put(10)
|
|
status.wait()
|
|
assert status.done is True
|
|
assert status.success is False
|
|
assert isinstance(status.exception(), ValueError)
|
|
|
|
# failure_operation
|
|
sig.put(0)
|
|
status = CompareStatus(
|
|
signal=sig, value=5, operation_success="==", failure_value=10, operation_failure=">"
|
|
)
|
|
sig.put(10)
|
|
assert status.done is False
|
|
assert status.success is False
|
|
sig.put(11)
|
|
with pytest.raises(ValueError):
|
|
status.wait()
|
|
assert status.done is True
|
|
assert status.success is False
|
|
|
|
# raise if array is returned
|
|
sig.put(0)
|
|
status = CompareStatus(signal=sig, value=5, operation_success="==")
|
|
with pytest.raises(ValueError):
|
|
sig.put([1, 2, 3])
|
|
status.wait(timeout=2)
|
|
assert status.done is True
|
|
assert status.success is False
|
|
|
|
|
|
def test_compare_status_string():
|
|
"""Test CompareStatus with string values"""
|
|
sig = Signal(name="test_signal", value="test")
|
|
status = CompareStatus(signal=sig, value="test", operation_success="==")
|
|
assert status.done is False
|
|
sig.put("test1")
|
|
assert status.done is False
|
|
sig.put("test")
|
|
assert status.done is True
|
|
|
|
sig.put("test")
|
|
# Test with different operations
|
|
status = CompareStatus(signal=sig, value="test", operation_success="!=")
|
|
assert status.done is False
|
|
sig.put("test")
|
|
assert status.done is False
|
|
sig.put("test1")
|
|
assert status.done is True
|
|
assert status.success is True
|
|
assert status.exception() is None
|
|
|
|
|
|
def test_transition_status():
|
|
"""Test TransitionStatus"""
|
|
sig = Signal(name="test_signal", value=0)
|
|
|
|
# Test strict=True, without intermediate transitions
|
|
sig.put(0)
|
|
status = TransitionStatus(signal=sig, transitions=[1, 2, 3], strict=True)
|
|
|
|
assert status.done is False
|
|
sig.put(1)
|
|
assert status.done is False
|
|
sig.put(2)
|
|
assert status.done is False
|
|
sig.put(3)
|
|
assert status.done is True
|
|
assert status.success is True
|
|
assert status.exception() is None
|
|
|
|
# Test strict=True, failure_states
|
|
sig.put(1)
|
|
status = TransitionStatus(signal=sig, transitions=[1, 2, 3], strict=True, failure_states=[4])
|
|
assert status.done is False
|
|
sig.put(4)
|
|
with pytest.raises(ValueError):
|
|
status.wait()
|
|
|
|
assert status.done is True
|
|
assert status.success is False
|
|
assert isinstance(status.exception(), ValueError)
|
|
|
|
# Test strict=False, with intermediate transitions
|
|
sig.put(0)
|
|
status = TransitionStatus(signal=sig, transitions=[1, 2, 3], strict=False)
|
|
|
|
assert status.done is False
|
|
sig.put(1) # entering first transition
|
|
sig.put(3)
|
|
sig.put(2) # transision
|
|
assert status.done is False
|
|
sig.put(4)
|
|
sig.put(2)
|
|
sig.put(3) # last transition
|
|
assert status.done is True
|
|
assert status.success is True
|
|
assert status.exception() is None
|
|
|
|
|
|
def test_transition_status_strings():
|
|
"""Test TransitionStatus with string values"""
|
|
sig = Signal(name="test_signal", value="a")
|
|
|
|
# Test strict=True, without intermediate transitions
|
|
sig.put("a")
|
|
status = TransitionStatus(signal=sig, transitions=["b", "c", "d"], strict=True)
|
|
|
|
assert status.done is False
|
|
sig.put("b")
|
|
assert status.done is False
|
|
sig.put("c")
|
|
assert status.done is False
|
|
sig.put("d")
|
|
assert status.done is True
|
|
assert status.success is True
|
|
assert status.exception() is None
|
|
|
|
# Test strict=True with additional intermediate transition
|
|
|
|
sig.put("a")
|
|
status = TransitionStatus(signal=sig, transitions=["b", "c", "d"], strict=True)
|
|
|
|
assert status.done is False
|
|
sig.put("b") # first transition
|
|
sig.put("e")
|
|
sig.put("b")
|
|
sig.put("c") # transision
|
|
assert status.done is False
|
|
sig.put("f")
|
|
sig.put("b")
|
|
sig.put("c")
|
|
sig.put("d") # transision
|
|
assert status.done is True
|
|
assert status.success is True
|
|
assert status.exception() is None
|
|
|
|
# Test strict=False, with intermediate transitions
|
|
sig.put("a")
|
|
status = TransitionStatus(signal=sig, transitions=["b", "c", "d"], strict=False)
|
|
|
|
assert status.done is False
|
|
sig.put("b") # entering first transition
|
|
sig.put("d")
|
|
sig.put("c") # transision
|
|
assert status.done is False
|
|
sig.put("e")
|
|
sig.put("c")
|
|
sig.put("d") # last transition
|
|
assert status.done is True
|
|
assert status.success is True
|
|
|
|
|
|
def test_compare_status_with_mock_pv(mock_epics_signal_ro):
|
|
"""Test CompareStatus with EpicsSignalRO, this tests callbacks on EpicsSignals"""
|
|
|
|
signal = mock_epics_signal_ro
|
|
status = CompareStatus(signal=signal, value=5, operation_success="==")
|
|
assert status.done is False
|
|
signal._read_pv.mock_data = 1
|
|
assert status.done is False
|
|
signal._read_pv.mock_data = 5
|
|
status.wait(timeout=1)
|
|
assert status.done is True
|
|
assert status.success is True
|
|
|
|
|
|
def test_compare_status_raises_on_failed_comparison(mock_epics_signal_ro):
|
|
"""Test CompareStatus raises on failed comparison with EpicsSignalRO"""
|
|
|
|
signal = mock_epics_signal_ro
|
|
status = CompareStatus(
|
|
signal=signal, value=5, operation_success="==", failure_value=[np.array([10])]
|
|
)
|
|
assert status.done is False
|
|
signal._read_pv.mock_data = 1
|
|
with pytest.raises(Exception):
|
|
status.wait(timeout=5)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"transitions, expected_done, expected_success",
|
|
[
|
|
([1, 2, 3], True, True), # Transitions completed successfully
|
|
([1, 3, 2], False, False), # Transitions completed with an error
|
|
([5, 4, 2, 1, 2, 3], True, True), # Transitions completed successfully
|
|
],
|
|
)
|
|
def test_transition_status_with_mock_pv(
|
|
mock_epics_signal_ro, transitions, expected_done, expected_success
|
|
):
|
|
"""Test TransitionStatus with EpicsSignalRO, this tests callbacks on EpicsSignals"""
|
|
# Starts immediately with 1
|
|
signal = mock_epics_signal_ro
|
|
signal._read_pv.mock_data = 1
|
|
status = TransitionStatus(signal=signal, transitions=[1, 2, 3], strict=False)
|
|
assert status.done is False
|
|
# Does not have to wait
|
|
signal._read_pv.mock_data = 3
|
|
signal._read_pv.mock_data = 2
|
|
signal._read_pv.mock_data = 3
|
|
status.wait(timeout=1)
|
|
assert status.done is True
|
|
assert status.success is True
|
|
# Test with various transitions
|
|
status = TransitionStatus(signal=signal, transitions=[1, 2, 3], strict=True)
|
|
for transition in transitions:
|
|
signal._read_pv.mock_data = transition
|
|
if expected_done:
|
|
status.wait(timeout=1)
|
|
assert status.done is True
|
|
assert status.success is expected_success
|
|
else:
|
|
with pytest.raises(WaitTimeoutError):
|
|
status.wait(timeout=1)
|
|
assert status.done is False
|
|
assert status.success is False
|
|
|
|
|
|
def test_patched_status_objects():
|
|
"""Test the patched Status objects in ophyd_devices that improve error handling."""
|
|
|
|
# StatusBase & AndStatus
|
|
st = StatusBase()
|
|
st2 = StatusBase()
|
|
and_st = st & st2
|
|
assert st in and_st
|
|
assert isinstance(and_st, AndStatus)
|
|
st.set_exception(ValueError("test error"))
|
|
with pytest.raises(ValueError):
|
|
and_st.wait(timeout=10)
|
|
|
|
# DeviceStatus & Status
|
|
dev = Device(name="device")
|
|
dev_status = DeviceStatus(device=dev)
|
|
|
|
st = Status()
|
|
and_st = st & dev_status
|
|
assert dev_status.device == dev
|
|
dev_status.set_exception(RuntimeError("device error"))
|
|
with pytest.raises(RuntimeError):
|
|
and_st.wait(timeout=10)
|
|
|
|
# Combine DeviceStatus with StatusBase and form AndStatus
|
|
st = StatusBase(obj=dev)
|
|
assert st.obj == dev
|
|
dev_st = DeviceStatus(device=dev)
|
|
combined_st = st & dev_st
|
|
st.set_finished()
|
|
dev_st.set_exception(RuntimeError("combined error"))
|
|
with pytest.raises(RuntimeError):
|
|
combined_st.wait(timeout=10)
|
|
|
|
# SubscriptionStatus
|
|
sig = Signal(name="test_signal", value=0)
|
|
|
|
def _cb(*args, **kwargs):
|
|
pass
|
|
|
|
sub_st = SubscriptionStatus(sig, callback=_cb)
|
|
sub_st.set_exception(ValueError("subscription error"))
|
|
with pytest.raises(ValueError):
|
|
sub_st.wait(timeout=10)
|
|
assert sub_st.done is True
|
|
assert sub_st.success is False
|
|
|
|
# MoveStatus, here the default for call_stop_on_failure is True
|
|
class Positioner(Device):
|
|
SUB_READBACK = "readback"
|
|
setpoint = Signal(name="setpoint", value=0)
|
|
readback = Signal(name="readback", value=0)
|
|
|
|
@property
|
|
def position(self):
|
|
return self.readback.get()
|
|
|
|
def stop(self):
|
|
pass
|
|
|
|
pos = Positioner(name="positioner")
|
|
move_st = MoveStatus(pos, target=10)
|
|
with mock.patch.object(pos, "stop") as mock_stop:
|
|
move_st.set_exception(RuntimeError("move error"))
|
|
mock_stop.assert_called_once()
|
|
with pytest.raises(RuntimeError):
|
|
move_st.wait(timeout=10)
|
|
assert move_st.done is True
|
|
assert move_st.success is False
|