From 1fd4fc7f21a70804c2b8956b6c636278ea54f2dc Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 12 Mar 2025 11:51:04 +0100 Subject: [PATCH] refactor(psi_device_base): add method to wait for a condition to PSIDeviceBase --- .../base_classes/psi_device_base.py | 45 ++++++++++++++++- tests/test_psi_device_base.py | 49 +++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 tests/test_psi_device_base.py diff --git a/ophyd_devices/interfaces/base_classes/psi_device_base.py b/ophyd_devices/interfaces/base_classes/psi_device_base.py index de6ad3e..c397b60 100644 --- a/ophyd_devices/interfaces/base_classes/psi_device_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_device_base.py @@ -4,6 +4,9 @@ Base class for all PSI ophyd device integration to ensure consistent configurati from __future__ import annotations +import time +from typing import Any, Callable + from bec_lib.devicemanager import ScanInfo from ophyd import Device, DeviceStatus, Staged, StatusBase @@ -11,6 +14,10 @@ from ophyd_devices.tests.utils import get_mock_scan_info from ophyd_devices.utils.psi_device_base_utils import FileHandler, TaskHandler +class DeviceStoppedError(Exception): + """Exception raised when a device is stopped""" + + class PSIDeviceBase(Device): """ Base class for all PSI ophyd devices to ensure consistent configuration @@ -128,8 +135,44 @@ class PSIDeviceBase(Device): success (bool): True if the action was successful, False otherwise. """ self.on_stop() + self.stopped = True # Set stopped flag to True, in case a custom stop method listens to stopped property super().stop(success=success) - self.stopped = True + + ######################################## + # Utility Method to wait for signals # + ######################################## + + def wait_for_condition( + self, + condition: Callable[[], bool], + timeout: float, + check_stopped: bool = False, + interval: float = 0.05, + ) -> bool: + """ + Utility method to easily wait for signals or methods to reach an expected state. + + Args: + condition (Callable): function that returns True if the condition is met, False otherwise + timeout (float): timeout in seconds + check_stopped (bool): True if stopped flag should be checked + interval (float): interval in seconds + + Returns: + bool: True if all signals are in the desired state, False if timeout is reached + + Example: + >>> self.wait_for_condition(condition=my_condition, timeout=5, interval=0.05, check_stopped=True) + """ + + start_time = time.time() + while time.time() < start_time + timeout: + if condition() is True: + return True + if check_stopped is True and self.stopped is True: + raise DeviceStoppedError(f"Device {self.name} has been stopped") + time.sleep(interval) + return False ######################################## # Beamline Specific Implementations # diff --git a/tests/test_psi_device_base.py b/tests/test_psi_device_base.py new file mode 100644 index 0000000..cf1740c --- /dev/null +++ b/tests/test_psi_device_base.py @@ -0,0 +1,49 @@ +import pytest + +from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError, PSIDeviceBase +from ophyd_devices.sim.sim_positioner import SimPositioner + + +class SimPositionerDevice(PSIDeviceBase, SimPositioner): + """Simulated Positioner Device with PSI Device Base""" + + +@pytest.fixture +def device(): + """Fixture for Device""" + yield SimPositionerDevice(name="device") + + +def test_psi_device_base_wait_for_signals(device): + """Test wait_for_signals method""" + device.motor_is_moving.set(1).wait() + + def check_motor_is_moving(): + return device.motor_is_moving.get() == 0 + + # Timeout + assert device.wait_for_condition(check_motor_is_moving, timeout=0.2) is False + + # Stopped + device._stopped = True + with pytest.raises(DeviceStoppedError): + device.wait_for_condition(check_motor_is_moving, timeout=1, check_stopped=True) + + # Success + device._stopped = False + device.motor_is_moving.set(0).wait() + assert device.wait_for_condition(check_motor_is_moving, timeout=1, check_stopped=True) is True + + device.velocity.set(10).wait() + + def check_both_conditions(): + return device.motor_is_moving.get() == 0 and device.velocity.get() == 10 + + # All signals True, default + assert device.wait_for_condition(check_both_conditions, timeout=1) is True + + def check_any_conditions(): + return device.motor_is_moving.get() == 0 or device.velocity.get() == 10 + + # Any signal is True + assert device.wait_for_condition(check_any_conditions, timeout=1) is True