From d9f09b0d866f97a859c9b437474928e7a9e8c1b6 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Fri, 8 Mar 2024 13:39:34 +0100 Subject: [PATCH] feat: added computed signal --- ophyd_devices/__init__.py | 5 +- ophyd_devices/utils/dynamic_pseudo.py | 122 ++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 ophyd_devices/utils/dynamic_pseudo.py diff --git a/ophyd_devices/__init__.py b/ophyd_devices/__init__.py index 8205c20..4fc1976 100644 --- a/ophyd_devices/__init__.py +++ b/ophyd_devices/__init__.py @@ -7,18 +7,19 @@ from .galil.sgalil_ophyd import SGalilMotor from .npoint.npoint import NPointAxis from .rt_lamni import RtFlomniMotor, RtLamniMotor from .sim.sim import SimCamera -from .sim.sim import SimMonitor from .sim.sim import SimFlyer from .sim.sim import SimFlyer as SynFlyer +from .sim.sim import SimMonitor from .sim.sim import SimMonitor as SynAxisMonitor from .sim.sim import SimMonitor as SynGaussBEC from .sim.sim import SimPositioner from .sim.sim import SimPositioner as SynAxisOPAAS from .sim.sim import SynDeviceOPAAS +from .sim.sim_frameworks import DeviceProxy, H5ImageReplayProxy, SlitProxy from .sim.sim_signals import ReadOnlySignal -from .sim.sim_frameworks import DeviceProxy, SlitProxy, H5ImageReplayProxy from .sim.sim_signals import ReadOnlySignal as SynSignalRO from .sls_devices.sls_devices import SLSInfo, SLSOperatorMessages from .smaract.smaract_ophyd import SmaractMotor from .utils.bec_device_base import BECDeviceBase +from .utils.dynamic_pseudo import ComputedSignal from .utils.static_device_test import launch diff --git a/ophyd_devices/utils/dynamic_pseudo.py b/ophyd_devices/utils/dynamic_pseudo.py new file mode 100644 index 0000000..097ce74 --- /dev/null +++ b/ophyd_devices/utils/dynamic_pseudo.py @@ -0,0 +1,122 @@ +""" +This module provides a class for creating a pseudo signal that is computed from other signals. +""" + +from functools import reduce + +from ophyd import SignalRO +from ophyd.ophydobj import Kind + + +def rgetattr(obj, attr, *args): + """See https://stackoverflow.com/questions/31174295/getattr-and-setattr-on-nested-objects""" + + def _getattr(obj, attr): + return getattr(obj, attr, *args) + + return reduce(_getattr, [obj] + attr.split(".")) + + +class ComputedSignal(SignalRO): + """ + A read-only signal that is computed from other signals. The compute method should be a string + representation of a function that takes the input signals as arguments and returns the computed + value. The input signals should be provided as a list of strings that represent the path to the + signal in the device manager. + """ + + def __init__( + self, + *, + name, + value=0, + timestamp=None, + device_manager=None, + parent=None, + labels=None, + kind=Kind.hinted, + tolerance=None, + rtolerance=None, + metadata=None, + cl=None, + attr_name="" + ): + super().__init__( + name=name, + value=value, + timestamp=timestamp, + parent=parent, + labels=labels, + kind=kind, + tolerance=tolerance, + rtolerance=rtolerance, + metadata=metadata, + cl=cl, + attr_name=attr_name, + ) + self._device_manager = device_manager + self._input_signals = [] + self._signal_subs = [] + self._compute_method = None + + def _signal_callback(self, *args, **kwargs): + self._run_subs(sub_type=self.SUB_VALUE, old_value=None, value=self.get()) + + @property + def compute_method(self): + """ + Set the compute method for the pseudo signal + + Args: + compute_method (str): The compute method to be used. This should be a string + representation of a function that takes the input signals as arguments + and returns the computed value. + + """ + return self._compute_method + + @compute_method.setter + def compute_method(self, method: str): + if not method.startswith("def"): + raise ValueError("The compute method should be a string representation of a function") + + # get the function name + function_name = method.split("(")[0].split(" ")[1] + # pylint: disable=exec-used + exec(method) + self._compute_method = locals()[function_name] + + @property + def input_signals(self): + """ + Set the input signals for the pseudo signal + + Args: + *input_vars: The input signals to be used for the computation + + """ + return self._input_signals + + @input_signals.setter + def input_signals(self, input_vars): + if self._signal_subs: + for signal, sub_id in self._signal_subs: + signal.unsubscribe(sub_id) + signals = [] + for signal in input_vars: + if isinstance(signal, str): + target = signal.replace("_", ".") + parts = target.split(".") + target = ".".join([parts[0], "obj"] + parts[1:]) + obj = rgetattr(self._device_manager.devices, target) + sub_id = obj.subscribe(self._signal_callback) + self._signal_subs.append((obj, sub_id)) + signals.append(obj) + else: + signals.append(signal) + self._input_signals = signals + + def get(self): + if self._compute_method: + return self._compute_method(*self._input_signals) + return None