mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2026-02-04 14:18:41 +01:00
feat: add custom status, CompareStatus and TargetStatus for easier signal value comparison
This commit is contained in:
@@ -20,4 +20,5 @@ from .devices.softpositioner import SoftPositioner
|
||||
from .utils.bec_device_base import BECDeviceBase
|
||||
from .utils.bec_signals import *
|
||||
from .utils.dynamic_pseudo import ComputedSignal
|
||||
from .utils.psi_device_base_utils import CompareStatus, TargetStatus
|
||||
from .utils.static_device_test import launch
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
"""Utility handler to run tasks (function, conditions) in an asynchronous fashion."""
|
||||
|
||||
import ctypes
|
||||
import operator
|
||||
import threading
|
||||
import traceback
|
||||
import uuid
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Callable
|
||||
from typing import TYPE_CHECKING, Any, Callable, Literal
|
||||
|
||||
from bec_lib.file_utils import get_full_path
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.utils.import_utils import lazy_import_from
|
||||
from ophyd import Device, DeviceStatus
|
||||
from ophyd import Device, DeviceStatus, Signal
|
||||
from ophyd.status import SubscriptionStatus
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.messages import ScanStatusMessage
|
||||
@@ -23,6 +25,83 @@ logger = bec_logger.logger
|
||||
|
||||
set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
|
||||
|
||||
OP_MAP = {
|
||||
"==": operator.eq,
|
||||
"!=": operator.ne,
|
||||
"<": operator.lt,
|
||||
"<=": operator.le,
|
||||
">": operator.gt,
|
||||
">=": operator.ge,
|
||||
}
|
||||
|
||||
|
||||
class CompareStatus(SubscriptionStatus):
|
||||
"""Status class to compare a value from a device signal with a target value."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
signal: Signal,
|
||||
value: Any,
|
||||
*,
|
||||
operation: Literal["==", "!=", "<", "<=", ">", ">="] = "==",
|
||||
event_type=None,
|
||||
timeout: float = None,
|
||||
settle_time: float = 0,
|
||||
run: bool = True,
|
||||
):
|
||||
if operation not in ("==", "!=", "<", "<=", ">", ">="):
|
||||
raise ValueError(
|
||||
f"Invalid operation: {operation}. Must be one of '==', '!=', '<', '<=', '>', '>='."
|
||||
)
|
||||
self._signal = signal
|
||||
self._value = value
|
||||
self._operation = operation
|
||||
super().__init__(
|
||||
device=signal,
|
||||
callback=self._compare_callback,
|
||||
timeout=timeout,
|
||||
settle_time=settle_time,
|
||||
event_type=event_type,
|
||||
run=run,
|
||||
)
|
||||
|
||||
def _compare_callback(self, value, **kwargs) -> bool:
|
||||
"""Callback for subscription Status"""
|
||||
return OP_MAP[self._operation](value, self._value)
|
||||
|
||||
|
||||
class TargetStatus(SubscriptionStatus):
|
||||
"""Status class to compare a list of values that are expected to be reached in sequence for a device signal."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
signal: Signal,
|
||||
values: list[Any],
|
||||
*,
|
||||
event_type=None,
|
||||
timeout: float = None,
|
||||
settle_time: float = 0,
|
||||
run: bool = True,
|
||||
):
|
||||
self._signal = signal
|
||||
self._values = values
|
||||
super().__init__(
|
||||
device=signal,
|
||||
callback=self._compare_callback,
|
||||
timeout=timeout,
|
||||
settle_time=settle_time,
|
||||
event_type=event_type,
|
||||
run=run,
|
||||
)
|
||||
|
||||
def _compare_callback(self, value, **kwargs) -> bool:
|
||||
"""Callback for subscription Status"""
|
||||
if value == self._values[0]:
|
||||
self._values.pop(0)
|
||||
if len(self._values) == 0:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class TaskState(str, Enum):
|
||||
"""Possible task states"""
|
||||
|
||||
Reference in New Issue
Block a user