mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-06-25 04:01:09 +02:00
refactor: ddg base class, add waitforstatus utility
This commit is contained in:
@ -1,4 +1,5 @@
|
|||||||
import enum
|
import enum
|
||||||
|
import threading
|
||||||
import time
|
import time
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@ -34,6 +35,9 @@ class DelayGeneratorError(Exception):
|
|||||||
class DeviceInitError(DelayGeneratorError):
|
class DeviceInitError(DelayGeneratorError):
|
||||||
"""Error upon failed initialization, invoked by missing device manager or device not started in sim_mode."""
|
"""Error upon failed initialization, invoked by missing device manager or device not started in sim_mode."""
|
||||||
|
|
||||||
|
class DeviceStopError(DelayGeneratorError):
|
||||||
|
"""Error upon failed initialization, invoked by missing device manager or device not started in sim_mode."""
|
||||||
|
|
||||||
|
|
||||||
class DelayGeneratorNotOkay(DelayGeneratorError):
|
class DelayGeneratorNotOkay(DelayGeneratorError):
|
||||||
"""Error when DDG is not okay"""
|
"""Error when DDG is not okay"""
|
||||||
@ -199,6 +203,142 @@ class DDGCustomMixin:
|
|||||||
elif status != "STATUS OK":
|
elif status != "STATUS OK":
|
||||||
raise DelayGeneratorNotOkay(f"DDG failed to start with status: {status}")
|
raise DelayGeneratorNotOkay(f"DDG failed to start with status: {status}")
|
||||||
|
|
||||||
|
def wait_for_signals(
|
||||||
|
self,
|
||||||
|
signal_conditions: list[tuple],
|
||||||
|
timeout: float,
|
||||||
|
check_stopped: bool = False,
|
||||||
|
interval: float = 0.05,
|
||||||
|
all_signals: bool = False,
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Convenience wrapper to allow waiting for signals to reach a certain condition.
|
||||||
|
For EPICs PVs, an example usage is pasted at the bottom.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
|
||||||
|
timeout (float): timeout in seconds
|
||||||
|
interval (float): interval in seconds
|
||||||
|
all_signals (bool): True if all signals should be True, False if any signal should be True
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if all signals are in the desired state, False if timeout is reached
|
||||||
|
|
||||||
|
>>> Example usage for EPICS PVs:
|
||||||
|
>>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True)
|
||||||
|
"""
|
||||||
|
|
||||||
|
timer = 0
|
||||||
|
while True:
|
||||||
|
checks = [
|
||||||
|
get_current_state() == condition
|
||||||
|
for get_current_state, condition in signal_conditions
|
||||||
|
]
|
||||||
|
if check_stopped is True and self.parent.stopped is True:
|
||||||
|
return False
|
||||||
|
if (all_signals and all(checks)) or (not all_signals and any(checks)):
|
||||||
|
return True
|
||||||
|
if timer > timeout:
|
||||||
|
return False
|
||||||
|
time.sleep(interval)
|
||||||
|
timer += interval
|
||||||
|
|
||||||
|
def wait_with_status(
|
||||||
|
self,
|
||||||
|
signal_conditions: list[tuple],
|
||||||
|
timeout: float,
|
||||||
|
check_stopped: bool = False,
|
||||||
|
interval: float = 0.05,
|
||||||
|
all_signals: bool = False,
|
||||||
|
exception_on_timeout: Exception = None,
|
||||||
|
) -> DeviceStatus:
|
||||||
|
"""Utility function to wait for signals in a thread.
|
||||||
|
Returns a DevicesStatus object that resolves either to set_finished or set_exception.
|
||||||
|
The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
This function should be used to wait for signals to reach a certain condition, especially in the context of
|
||||||
|
on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC.
|
||||||
|
It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met,
|
||||||
|
the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception.
|
||||||
|
The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
|
||||||
|
timeout (float): timeout in seconds
|
||||||
|
check_stopped (bool): True if stopped flag should be checked
|
||||||
|
interval (float): interval in seconds
|
||||||
|
all_signals (bool): True if all signals should be True, False if any signal should be True
|
||||||
|
exception_on_timeout (Exception): Exception to raise on timeout
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception
|
||||||
|
"""
|
||||||
|
if exception_on_timeout is None:
|
||||||
|
exception_on_timeout = DeviceTimeoutError(
|
||||||
|
f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}"
|
||||||
|
)
|
||||||
|
|
||||||
|
status = DeviceStatus(self.parent)
|
||||||
|
|
||||||
|
# utility function to wrap the wait_for_signals function
|
||||||
|
def wait_for_signals_wrapper(
|
||||||
|
status: DeviceStatus,
|
||||||
|
signal_conditions: list[tuple],
|
||||||
|
timeout: float,
|
||||||
|
check_stopped: bool,
|
||||||
|
interval: float,
|
||||||
|
all_signals: bool,
|
||||||
|
exception_on_timeout: Exception,
|
||||||
|
):
|
||||||
|
"""Convenient wrapper around wait_for_signals to set status based on the result.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
status (DeviceStatus): DeviceStatus object to be set
|
||||||
|
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
|
||||||
|
timeout (float): timeout in seconds
|
||||||
|
check_stopped (bool): True if stopped flag should be checked
|
||||||
|
interval (float): interval in seconds
|
||||||
|
all_signals (bool): True if all signals should be True, False if any signal should be True
|
||||||
|
exception_on_timeout (Exception): Exception to raise on timeout
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
result = self.wait_for_signals(
|
||||||
|
signal_conditions, timeout, check_stopped, interval, all_signals
|
||||||
|
)
|
||||||
|
if result:
|
||||||
|
status.set_finished()
|
||||||
|
else:
|
||||||
|
if self.parent.stopped:
|
||||||
|
# INFO This will execute a callback to the parent device.stop() method
|
||||||
|
status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped"))
|
||||||
|
else:
|
||||||
|
# INFO This will execute a callback to the parent device.stop() method
|
||||||
|
status.set_exception(exc=exception_on_timeout)
|
||||||
|
# pylint: disable=broad-except
|
||||||
|
except Exception as exc:
|
||||||
|
content = traceback.format_exc()
|
||||||
|
logger.warning(
|
||||||
|
f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}"
|
||||||
|
)
|
||||||
|
# INFO This will execute a callback to the parent device.stop() method
|
||||||
|
status.set_exception(exc=exc)
|
||||||
|
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=wait_for_signals_wrapper,
|
||||||
|
args=(
|
||||||
|
status,
|
||||||
|
signal_conditions,
|
||||||
|
timeout,
|
||||||
|
check_stopped,
|
||||||
|
interval,
|
||||||
|
all_signals,
|
||||||
|
exception_on_timeout,
|
||||||
|
),
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
|
thread.start()
|
||||||
|
return status
|
||||||
|
|
||||||
class PSIDelayGeneratorBase(Device):
|
class PSIDelayGeneratorBase(Device):
|
||||||
"""
|
"""
|
||||||
@ -249,7 +389,7 @@ class PSIDelayGeneratorBase(Device):
|
|||||||
|
|
||||||
# Assign PVs from DDG645
|
# Assign PVs from DDG645
|
||||||
trigger_burst_readout = Component(
|
trigger_burst_readout = Component(
|
||||||
EpicsSignal, "EventStatusLI.PROC", name="trigger_burst_readout"
|
EpicsSignal, "EventStatusLI.PROC", name="trigger_burst_readout",put_complete=True
|
||||||
)
|
)
|
||||||
burst_cycle_finished = Component(EpicsSignalRO, "EventStatusMBBID.B3", name="read_burst_state")
|
burst_cycle_finished = Component(EpicsSignalRO, "EventStatusMBBID.B3", name="read_burst_state")
|
||||||
delay_finished = Component(EpicsSignalRO, "EventStatusMBBID.B2", name="delay_finished")
|
delay_finished = Component(EpicsSignalRO, "EventStatusMBBID.B2", name="delay_finished")
|
||||||
|
Reference in New Issue
Block a user