mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-05-25 18:00:40 +02:00
fix: fixed dynamic pseudo
This commit is contained in:
parent
63de4f5a22
commit
33e4458c59
@ -5,9 +5,12 @@ This module provides a class for creating a pseudo signal that is computed from
|
|||||||
from functools import reduce
|
from functools import reduce
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
|
||||||
|
from bec_lib import bec_logger
|
||||||
from ophyd import SignalRO
|
from ophyd import SignalRO
|
||||||
from ophyd.ophydobj import Kind
|
from ophyd.ophydobj import Kind
|
||||||
|
|
||||||
|
logger = bec_logger.logger
|
||||||
|
|
||||||
|
|
||||||
def rgetattr(obj, attr, *args):
|
def rgetattr(obj, attr, *args):
|
||||||
"""See https://stackoverflow.com/questions/31174295/getattr-and-setattr-on-nested-objects"""
|
"""See https://stackoverflow.com/questions/31174295/getattr-and-setattr-on-nested-objects"""
|
||||||
@ -40,7 +43,7 @@ class ComputedSignal(SignalRO):
|
|||||||
rtolerance=None,
|
rtolerance=None,
|
||||||
metadata=None,
|
metadata=None,
|
||||||
cl=None,
|
cl=None,
|
||||||
attr_name=""
|
attr_name="",
|
||||||
):
|
):
|
||||||
super().__init__(
|
super().__init__(
|
||||||
name=name,
|
name=name,
|
||||||
@ -59,6 +62,7 @@ class ComputedSignal(SignalRO):
|
|||||||
self._input_signals = []
|
self._input_signals = []
|
||||||
self._signal_subs = []
|
self._signal_subs = []
|
||||||
self._compute_method = None
|
self._compute_method = None
|
||||||
|
self._compute_method_str = None
|
||||||
|
|
||||||
def _signal_callback(self, *args, **kwargs):
|
def _signal_callback(self, *args, **kwargs):
|
||||||
self._run_subs(sub_type=self.SUB_VALUE, old_value=None, value=self.get())
|
self._run_subs(sub_type=self.SUB_VALUE, old_value=None, value=self.get())
|
||||||
@ -77,18 +81,22 @@ class ComputedSignal(SignalRO):
|
|||||||
>>> signal.compute_method = "def test(a, b): return a.get() + b.get()"
|
>>> signal.compute_method = "def test(a, b): return a.get() + b.get()"
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return self._compute_method
|
return self._compute_method_str
|
||||||
|
|
||||||
@compute_method.setter
|
@compute_method.setter
|
||||||
def compute_method(self, method: str):
|
def compute_method(self, method: str):
|
||||||
|
logger.info(f"Updating compute method for {self.name}.")
|
||||||
|
method = method.strip()
|
||||||
if not method.startswith("def"):
|
if not method.startswith("def"):
|
||||||
raise ValueError("The compute method should be a string representation of a function")
|
raise ValueError("The compute method should be a string representation of a function")
|
||||||
|
|
||||||
# get the function name
|
# get the function name
|
||||||
function_name = method.split("(")[0].split(" ")[1]
|
function_name = method.split("(")[0].split(" ")[1]
|
||||||
|
method = method.replace(function_name, "user_compute_method")
|
||||||
|
self._compute_method_str = method
|
||||||
# pylint: disable=exec-used
|
# pylint: disable=exec-used
|
||||||
exec(method)
|
exec(method)
|
||||||
self._compute_method = locals()[function_name]
|
self._compute_method = locals()["user_compute_method"]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def input_signals(self):
|
def input_signals(self):
|
||||||
@ -124,7 +132,9 @@ class ComputedSignal(SignalRO):
|
|||||||
self._input_signals = signals
|
self._input_signals = signals
|
||||||
|
|
||||||
def get(self):
|
def get(self):
|
||||||
if self.compute_method:
|
if self._compute_method:
|
||||||
# pylint: disable=not-callable
|
# pylint: disable=not-callable
|
||||||
return self.compute_method(*self.input_signals)
|
if self.input_signals:
|
||||||
|
return self._compute_method(*self.input_signals)
|
||||||
|
return self._compute_method()
|
||||||
return None
|
return None
|
||||||
|
@ -27,4 +27,6 @@ def test_computed_signal(device_manager_with_devices):
|
|||||||
signal.input_signals = ["a_readback", "b_readback"]
|
signal.input_signals = ["a_readback", "b_readback"]
|
||||||
assert signal.get() == 40
|
assert signal.get() == 40
|
||||||
|
|
||||||
assert callable(signal.compute_method)
|
# pylint: disable=protected-access
|
||||||
|
assert callable(signal._compute_method)
|
||||||
|
assert signal._compute_method_str == "def user_compute_method(a, b): return a.get() + b.get()"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user