mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-06-06 03:40:41 +02:00
refactor(psi_device_base): add method to wait for a condition to PSIDeviceBase
This commit is contained in:
parent
00ca4574ea
commit
1fd4fc7f21
@ -4,6 +4,9 @@ Base class for all PSI ophyd device integration to ensure consistent configurati
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import Any, Callable
|
||||
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from ophyd import Device, DeviceStatus, Staged, StatusBase
|
||||
|
||||
@ -11,6 +14,10 @@ from ophyd_devices.tests.utils import get_mock_scan_info
|
||||
from ophyd_devices.utils.psi_device_base_utils import FileHandler, TaskHandler
|
||||
|
||||
|
||||
class DeviceStoppedError(Exception):
|
||||
"""Exception raised when a device is stopped"""
|
||||
|
||||
|
||||
class PSIDeviceBase(Device):
|
||||
"""
|
||||
Base class for all PSI ophyd devices to ensure consistent configuration
|
||||
@ -128,8 +135,44 @@ class PSIDeviceBase(Device):
|
||||
success (bool): True if the action was successful, False otherwise.
|
||||
"""
|
||||
self.on_stop()
|
||||
self.stopped = True # Set stopped flag to True, in case a custom stop method listens to stopped property
|
||||
super().stop(success=success)
|
||||
self.stopped = True
|
||||
|
||||
########################################
|
||||
# Utility Method to wait for signals #
|
||||
########################################
|
||||
|
||||
def wait_for_condition(
|
||||
self,
|
||||
condition: Callable[[], bool],
|
||||
timeout: float,
|
||||
check_stopped: bool = False,
|
||||
interval: float = 0.05,
|
||||
) -> bool:
|
||||
"""
|
||||
Utility method to easily wait for signals or methods to reach an expected state.
|
||||
|
||||
Args:
|
||||
condition (Callable): function that returns True if the condition is met, False otherwise
|
||||
timeout (float): timeout in seconds
|
||||
check_stopped (bool): True if stopped flag should be checked
|
||||
interval (float): interval in seconds
|
||||
|
||||
Returns:
|
||||
bool: True if all signals are in the desired state, False if timeout is reached
|
||||
|
||||
Example:
|
||||
>>> self.wait_for_condition(condition=my_condition, timeout=5, interval=0.05, check_stopped=True)
|
||||
"""
|
||||
|
||||
start_time = time.time()
|
||||
while time.time() < start_time + timeout:
|
||||
if condition() is True:
|
||||
return True
|
||||
if check_stopped is True and self.stopped is True:
|
||||
raise DeviceStoppedError(f"Device {self.name} has been stopped")
|
||||
time.sleep(interval)
|
||||
return False
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
|
49
tests/test_psi_device_base.py
Normal file
49
tests/test_psi_device_base.py
Normal file
@ -0,0 +1,49 @@
|
||||
import pytest
|
||||
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError, PSIDeviceBase
|
||||
from ophyd_devices.sim.sim_positioner import SimPositioner
|
||||
|
||||
|
||||
class SimPositionerDevice(PSIDeviceBase, SimPositioner):
|
||||
"""Simulated Positioner Device with PSI Device Base"""
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def device():
|
||||
"""Fixture for Device"""
|
||||
yield SimPositionerDevice(name="device")
|
||||
|
||||
|
||||
def test_psi_device_base_wait_for_signals(device):
|
||||
"""Test wait_for_signals method"""
|
||||
device.motor_is_moving.set(1).wait()
|
||||
|
||||
def check_motor_is_moving():
|
||||
return device.motor_is_moving.get() == 0
|
||||
|
||||
# Timeout
|
||||
assert device.wait_for_condition(check_motor_is_moving, timeout=0.2) is False
|
||||
|
||||
# Stopped
|
||||
device._stopped = True
|
||||
with pytest.raises(DeviceStoppedError):
|
||||
device.wait_for_condition(check_motor_is_moving, timeout=1, check_stopped=True)
|
||||
|
||||
# Success
|
||||
device._stopped = False
|
||||
device.motor_is_moving.set(0).wait()
|
||||
assert device.wait_for_condition(check_motor_is_moving, timeout=1, check_stopped=True) is True
|
||||
|
||||
device.velocity.set(10).wait()
|
||||
|
||||
def check_both_conditions():
|
||||
return device.motor_is_moving.get() == 0 and device.velocity.get() == 10
|
||||
|
||||
# All signals True, default
|
||||
assert device.wait_for_condition(check_both_conditions, timeout=1) is True
|
||||
|
||||
def check_any_conditions():
|
||||
return device.motor_is_moving.get() == 0 or device.velocity.get() == 10
|
||||
|
||||
# Any signal is True
|
||||
assert device.wait_for_condition(check_any_conditions, timeout=1) is True
|
Loading…
x
Reference in New Issue
Block a user