frappy_psi.softcal: add function evaluator
This commit is contained in:
@@ -30,6 +30,8 @@ from frappy.core import Attached, BoolType, Parameter, Readable, StringType, \
|
|||||||
FloatRange, nopoll
|
FloatRange, nopoll
|
||||||
from frappy_psi.convergence import HasConvergence
|
from frappy_psi.convergence import HasConvergence
|
||||||
from frappy_psi.picontrol import PImixin
|
from frappy_psi.picontrol import PImixin
|
||||||
|
from frappy.lib.mathparser import MathParser
|
||||||
|
from frappy.errors import RangeError
|
||||||
|
|
||||||
|
|
||||||
def linear(x):
|
def linear(x):
|
||||||
@@ -238,3 +240,60 @@ class Sensor(Readable):
|
|||||||
|
|
||||||
class SoftPiLoop(HasConvergence, PImixin, Sensor):
|
class SoftPiLoop(HasConvergence, PImixin, Sensor):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class Function(Readable):
|
||||||
|
rawsensor = Attached()
|
||||||
|
|
||||||
|
formula = Parameter('calib function', StringType(), readonly=False)
|
||||||
|
value = Parameter('physical quantity', FloatRange())
|
||||||
|
_parser = MathParser(x=0)
|
||||||
|
_value_error = None
|
||||||
|
|
||||||
|
def initModule(self):
|
||||||
|
super().initModule()
|
||||||
|
self.rawsensor.registerCallbacks(self, ['status']) # auto update status
|
||||||
|
if self.description == '':
|
||||||
|
self.description = f'{self.rawsensor!r} with applied function'
|
||||||
|
|
||||||
|
def _get_status(self, rawstatus):
|
||||||
|
return rawstatus if self._value_error is None else (self.Status.ERROR, self._value_error)
|
||||||
|
|
||||||
|
def update_value(self, rawvalue, err=None):
|
||||||
|
if err:
|
||||||
|
err = repr(err)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
self.value = self._get_value(rawvalue)
|
||||||
|
except Exception as e:
|
||||||
|
err = repr(e)
|
||||||
|
if err != self._value_error:
|
||||||
|
self._value_error = err
|
||||||
|
self.status = self._get_status(self.rawsensor.status)
|
||||||
|
|
||||||
|
def update_status(self, rawstatus):
|
||||||
|
self.status = self._get_status(rawstatus)
|
||||||
|
|
||||||
|
def write_formula(self, formula):
|
||||||
|
self._parser = MathParser(x=0)
|
||||||
|
self.formula = formula
|
||||||
|
self.read_value()
|
||||||
|
self._value_error = None
|
||||||
|
self.read_status()
|
||||||
|
|
||||||
|
def _get_value(self, rawvalue):
|
||||||
|
try:
|
||||||
|
return self._parser.calculate(self.formula, x=rawvalue)
|
||||||
|
except Exception as e:
|
||||||
|
self._value_error = f'error in formula {self.formula!r}: {e!r}'
|
||||||
|
self.read_status()
|
||||||
|
raise RangeError(self._value_error) from None
|
||||||
|
|
||||||
|
@nopoll
|
||||||
|
def read_value(self):
|
||||||
|
return self._get_value(self.rawsensor.read_value())
|
||||||
|
|
||||||
|
@nopoll
|
||||||
|
def read_status(self):
|
||||||
|
return self._get_status(self.rawsensor.read_status())
|
||||||
|
|
||||||
Reference in New Issue
Block a user