mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-06-24 11:41:09 +02:00
Add methods to PSIDeviceBase to register status object that should be cancelled when the device is stopped or destroyed.
179 lines
5.3 KiB
Python
179 lines
5.3 KiB
Python
"""Module for testing the PSIDeviceBase class."""
|
|
|
|
import threading
|
|
import time
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
from ophyd import Device
|
|
from ophyd.status import StatusBase
|
|
|
|
from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError, PSIDeviceBase
|
|
from ophyd_devices.sim.sim_camera import SimCamera
|
|
from ophyd_devices.sim.sim_positioner import SimPositioner
|
|
|
|
# pylint: disable=redefined-outer-name
|
|
# pylint: disable=protected-access
|
|
|
|
|
|
class SimPositionerDevice(PSIDeviceBase, SimPositioner):
|
|
"""Simulated Positioner Device with PSI Device Base"""
|
|
|
|
|
|
class SimDevice(PSIDeviceBase, Device):
|
|
"""Simulated Device with PSI Device Base"""
|
|
|
|
|
|
@pytest.fixture
|
|
def device_positioner():
|
|
"""Fixture for Device"""
|
|
yield SimPositionerDevice(name="device")
|
|
|
|
|
|
@pytest.fixture
|
|
def device():
|
|
"""Fixture for Device"""
|
|
yield SimDevice(name="device", prefix="test:")
|
|
|
|
|
|
def test_psi_device_base_wait_for_signals(device_positioner):
|
|
"""Test wait_for_signals method"""
|
|
device: SimPositionerDevice = device_positioner
|
|
device.motor_is_moving.set(1).wait()
|
|
|
|
def check_motor_is_moving():
|
|
return device.motor_is_moving.get() == 0
|
|
|
|
# Timeout
|
|
assert device.wait_for_condition(check_motor_is_moving, timeout=0.2) is False
|
|
|
|
# Stopped
|
|
device._stopped = True
|
|
with pytest.raises(DeviceStoppedError):
|
|
device.wait_for_condition(check_motor_is_moving, timeout=1, check_stopped=True)
|
|
|
|
# Success
|
|
device._stopped = False
|
|
device.motor_is_moving.set(0).wait()
|
|
assert device.wait_for_condition(check_motor_is_moving, timeout=1, check_stopped=True) is True
|
|
|
|
device.velocity.set(10).wait()
|
|
|
|
def check_both_conditions():
|
|
return device.motor_is_moving.get() == 0 and device.velocity.get() == 10
|
|
|
|
# All signals True, default
|
|
assert device.wait_for_condition(check_both_conditions, timeout=1) is True
|
|
|
|
def check_any_conditions():
|
|
return device.motor_is_moving.get() == 0 or device.velocity.get() == 10
|
|
|
|
# Any signal is True
|
|
assert device.wait_for_condition(check_any_conditions, timeout=1) is True
|
|
|
|
|
|
def test_psi_device_base_init_with_device_manager():
|
|
"""Test init with device manager"""
|
|
dm = mock.MagicMock()
|
|
device = SimPositionerDevice(name="device", device_manager=dm)
|
|
assert device.device_manager is dm
|
|
# device_manager should b passed to SimCamera through PSIDeviceBase
|
|
device_2 = SimCamera(name="device", device_manager=dm)
|
|
assert device_2.device_manager is dm
|
|
|
|
|
|
def test_on_stage_hook(device):
|
|
"""Test user method hooks"""
|
|
with mock.patch.object(device, "on_stage") as mock_on_stage:
|
|
res = device.stage()
|
|
if not isinstance(res, StatusBase):
|
|
assert isinstance(res, list) is True
|
|
mock_on_stage.assert_called_once()
|
|
|
|
|
|
def test_on_destroy_hook(device):
|
|
"""Test on destroy hook"""
|
|
assert device.destroyed is False
|
|
with mock.patch.object(device, "on_destroy") as mock_on_destroy:
|
|
device.destroy()
|
|
mock_on_destroy.assert_called_once()
|
|
assert device.destroyed is True
|
|
|
|
|
|
def test_on_unstage_hook(device):
|
|
"""Test user method hooks"""
|
|
with mock.patch.object(device, "on_unstage") as mock_on_unstage:
|
|
res = device.unstage()
|
|
if not isinstance(res, StatusBase):
|
|
assert isinstance(res, list) is True
|
|
mock_on_unstage.assert_called_once()
|
|
|
|
|
|
def test_on_complete_hook(device):
|
|
"""Test user method hooks"""
|
|
with mock.patch.object(device, "on_complete") as mock_on_complete:
|
|
status = device.complete()
|
|
assert isinstance(status, StatusBase) is True
|
|
mock_on_complete.assert_called_once()
|
|
|
|
|
|
def test_on_kickoff_hook(device):
|
|
"""Test user method hooks"""
|
|
with mock.patch.object(device, "on_kickoff") as mock_on_kickoff:
|
|
status = device.kickoff()
|
|
assert isinstance(status, StatusBase) is True
|
|
mock_on_kickoff.assert_called_once()
|
|
|
|
|
|
def test_on_trigger_hook(device):
|
|
"""Test user method hooks"""
|
|
with mock.patch.object(device, "on_trigger") as mock_on_trigger:
|
|
mock_on_trigger.return_value = None
|
|
status = device.trigger()
|
|
assert isinstance(status, StatusBase) is True
|
|
mock_on_trigger.assert_called_once()
|
|
|
|
|
|
def test_on_pre_scan_hook(device):
|
|
"""Test user method hooks"""
|
|
with mock.patch.object(device, "on_pre_scan") as mock_on_pre_scan:
|
|
mock_on_pre_scan.return_value = None
|
|
status = device.pre_scan()
|
|
assert status is None
|
|
mock_on_pre_scan.assert_called_once()
|
|
|
|
|
|
def test_on_stop_hook(device):
|
|
"""Test user method hooks"""
|
|
with mock.patch.object(device, "on_stop") as mock_on_stop:
|
|
device.stop()
|
|
mock_on_stop.assert_called_once()
|
|
|
|
|
|
def test_stoppable_status(device):
|
|
"""Test stoppable status"""
|
|
status = StatusBase()
|
|
device.cancel_on_stop(status)
|
|
device.stop()
|
|
assert status.done is True
|
|
assert status.success is False
|
|
|
|
|
|
def test_stoppable_status_not_done(device):
|
|
"""Test stoppable status not done"""
|
|
|
|
def stop_after_delay():
|
|
time.sleep(5)
|
|
device.stop()
|
|
|
|
status = StatusBase()
|
|
device.cancel_on_stop(status)
|
|
thread = threading.Thread(target=stop_after_delay)
|
|
thread.start()
|
|
|
|
with pytest.raises(DeviceStoppedError, match="Device device has been stopped"):
|
|
status.wait()
|
|
|
|
assert status.done is True
|
|
assert status.success is False
|