From 392d32ef12ac53f4b82b7fa9cc83d791afcc81c5 Mon Sep 17 00:00:00 2001 From: gac-x12sa Date: Tue, 8 Oct 2024 15:01:10 +0200 Subject: [PATCH] refactor: ddg base class, add waitforstatus utility --- .../base_classes/psi_delay_generator_base.py | 142 +++++++++++++++++- 1 file changed, 141 insertions(+), 1 deletion(-) diff --git a/ophyd_devices/interfaces/base_classes/psi_delay_generator_base.py b/ophyd_devices/interfaces/base_classes/psi_delay_generator_base.py index 0b2db21..b936c87 100644 --- a/ophyd_devices/interfaces/base_classes/psi_delay_generator_base.py +++ b/ophyd_devices/interfaces/base_classes/psi_delay_generator_base.py @@ -1,4 +1,5 @@ import enum +import threading import time from typing import Any @@ -34,6 +35,9 @@ class DelayGeneratorError(Exception): class DeviceInitError(DelayGeneratorError): """Error upon failed initialization, invoked by missing device manager or device not started in sim_mode.""" +class DeviceStopError(DelayGeneratorError): + """Error upon failed initialization, invoked by missing device manager or device not started in sim_mode.""" + class DelayGeneratorNotOkay(DelayGeneratorError): """Error when DDG is not okay""" @@ -199,6 +203,142 @@ class DDGCustomMixin: elif status != "STATUS OK": raise DelayGeneratorNotOkay(f"DDG failed to start with status: {status}") + def wait_for_signals( + self, + signal_conditions: list[tuple], + timeout: float, + check_stopped: bool = False, + interval: float = 0.05, + all_signals: bool = False, + ) -> bool: + """ + Convenience wrapper to allow waiting for signals to reach a certain condition. + For EPICs PVs, an example usage is pasted at the bottom. + + Args: + signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + timeout (float): timeout in seconds + interval (float): interval in seconds + all_signals (bool): True if all signals should be True, False if any signal should be True + + Returns: + bool: True if all signals are in the desired state, False if timeout is reached + + >>> Example usage for EPICS PVs: + >>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True) + """ + + timer = 0 + while True: + checks = [ + get_current_state() == condition + for get_current_state, condition in signal_conditions + ] + if check_stopped is True and self.parent.stopped is True: + return False + if (all_signals and all(checks)) or (not all_signals and any(checks)): + return True + if timer > timeout: + return False + time.sleep(interval) + timer += interval + + def wait_with_status( + self, + signal_conditions: list[tuple], + timeout: float, + check_stopped: bool = False, + interval: float = 0.05, + all_signals: bool = False, + exception_on_timeout: Exception = None, + ) -> DeviceStatus: + """Utility function to wait for signals in a thread. + Returns a DevicesStatus object that resolves either to set_finished or set_exception. + The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase. + + Usage: + This function should be used to wait for signals to reach a certain condition, especially in the context of + on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC. + It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met, + the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception. + The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError. + + Args: + signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + timeout (float): timeout in seconds + check_stopped (bool): True if stopped flag should be checked + interval (float): interval in seconds + all_signals (bool): True if all signals should be True, False if any signal should be True + exception_on_timeout (Exception): Exception to raise on timeout + + Returns: + DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception + """ + if exception_on_timeout is None: + exception_on_timeout = DeviceTimeoutError( + f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}" + ) + + status = DeviceStatus(self.parent) + + # utility function to wrap the wait_for_signals function + def wait_for_signals_wrapper( + status: DeviceStatus, + signal_conditions: list[tuple], + timeout: float, + check_stopped: bool, + interval: float, + all_signals: bool, + exception_on_timeout: Exception, + ): + """Convenient wrapper around wait_for_signals to set status based on the result. + + Args: + status (DeviceStatus): DeviceStatus object to be set + signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + timeout (float): timeout in seconds + check_stopped (bool): True if stopped flag should be checked + interval (float): interval in seconds + all_signals (bool): True if all signals should be True, False if any signal should be True + exception_on_timeout (Exception): Exception to raise on timeout + """ + try: + result = self.wait_for_signals( + signal_conditions, timeout, check_stopped, interval, all_signals + ) + if result: + status.set_finished() + else: + if self.parent.stopped: + # INFO This will execute a callback to the parent device.stop() method + status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped")) + else: + # INFO This will execute a callback to the parent device.stop() method + status.set_exception(exc=exception_on_timeout) + # pylint: disable=broad-except + except Exception as exc: + content = traceback.format_exc() + logger.warning( + f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}" + ) + # INFO This will execute a callback to the parent device.stop() method + status.set_exception(exc=exc) + + thread = threading.Thread( + target=wait_for_signals_wrapper, + args=( + status, + signal_conditions, + timeout, + check_stopped, + interval, + all_signals, + exception_on_timeout, + ), + daemon=True, + ) + thread.start() + return status class PSIDelayGeneratorBase(Device): """ @@ -249,7 +389,7 @@ class PSIDelayGeneratorBase(Device): # Assign PVs from DDG645 trigger_burst_readout = Component( - EpicsSignal, "EventStatusLI.PROC", name="trigger_burst_readout" + EpicsSignal, "EventStatusLI.PROC", name="trigger_burst_readout",put_complete=True ) burst_cycle_finished = Component(EpicsSignalRO, "EventStatusMBBID.B3", name="read_burst_state") delay_finished = Component(EpicsSignalRO, "EventStatusMBBID.B2", name="delay_finished")