improve softcal
- be more tolerant parsing header of .340 file - when curve not found, look also in secop_psi/calcurves - better error message when curve not readable - check that data points are monotonic - auto create description if missing - some more minor stuff Change-Id: Iecc4dd3dda843b44391aa56272840472a61d4b2c Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/27909 Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch> Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
This commit is contained in:
parent
3fe44d32b1
commit
c1d42f0f02
@ -22,12 +22,12 @@
|
|||||||
|
|
||||||
import math
|
import math
|
||||||
import os
|
import os
|
||||||
from os.path import basename, exists, join
|
from os.path import basename, dirname, exists, join
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from scipy.interpolate import splev, splrep # pylint: disable=import-error
|
from scipy.interpolate import splev, splrep # pylint: disable=import-error
|
||||||
|
|
||||||
from secop.core import Attached, BoolType, Parameter, Readable, StringType
|
from secop.core import Attached, BoolType, Parameter, Readable, StringType, FloatRange
|
||||||
|
|
||||||
|
|
||||||
def linear(x):
|
def linear(x):
|
||||||
@ -74,13 +74,18 @@ class Parser340(StdParser):
|
|||||||
def parse(self, line):
|
def parse(self, line):
|
||||||
"""scan header for data format"""
|
"""scan header for data format"""
|
||||||
if self.header:
|
if self.header:
|
||||||
if line.startswith("Data Format"):
|
key, _, value = line.partition(':')
|
||||||
dataformat = line.split(":")[1].strip()[0]
|
if value: # this is a header line, as it contains ':'
|
||||||
if dataformat == '4':
|
value = value.split()[0]
|
||||||
self.logx, self.logy = True, False # logOhm
|
key = ''.join(key.split()).lower()
|
||||||
elif dataformat == '5':
|
if key == 'dataformat':
|
||||||
self.logx, self.logy = True, True # logOhm, logK
|
if value == '4':
|
||||||
elif line.startswith("No."):
|
self.logx, self.logy = True, False # logOhm
|
||||||
|
elif value == '5':
|
||||||
|
self.logx, self.logy = True, True # logOhm, logK
|
||||||
|
elif value not in ('1', '2', '3'):
|
||||||
|
raise ValueError('invalid Data Format')
|
||||||
|
elif 'No.' in line:
|
||||||
self.header = False
|
self.header = False
|
||||||
return
|
return
|
||||||
super().parse(line)
|
super().parse(line)
|
||||||
@ -104,7 +109,9 @@ class CalCurve:
|
|||||||
calibname = sensopt.pop(0)
|
calibname = sensopt.pop(0)
|
||||||
_, dot, ext = basename(calibname).rpartition('.')
|
_, dot, ext = basename(calibname).rpartition('.')
|
||||||
kind = None
|
kind = None
|
||||||
for path in os.environ.get('FRAPPY_CALIB_PATH', '').split(','):
|
pathlist = os.environ.get('FRAPPY_CALIB_PATH', '').split(',')
|
||||||
|
pathlist.append(join(dirname(__file__), 'calcurves'))
|
||||||
|
for path in pathlist:
|
||||||
# first try without adding kind
|
# first try without adding kind
|
||||||
filename = join(path.strip(), calibname)
|
filename = join(path.strip(), calibname)
|
||||||
if exists(filename):
|
if exists(filename):
|
||||||
@ -134,13 +141,26 @@ class CalCurve:
|
|||||||
cls, args = KINDS.get(kind, (StdParser, {}))
|
cls, args = KINDS.get(kind, (StdParser, {}))
|
||||||
args.update(optargs)
|
args.update(optargs)
|
||||||
|
|
||||||
parser = cls(**args)
|
try:
|
||||||
with open(filename) as f:
|
parser = cls(**args)
|
||||||
for line in f:
|
with open(filename) as f:
|
||||||
parser.parse(line)
|
for line in f:
|
||||||
|
parser.parse(line)
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError('calib curve %s: %s' % (calibspec, e)) from e
|
||||||
self.convert_x = nplog if parser.logx else linear
|
self.convert_x = nplog if parser.logx else linear
|
||||||
self.convert_y = npexp if parser.logy else linear
|
self.convert_y = npexp if parser.logy else linear
|
||||||
self.spline = splrep(np.asarray(parser.xdata), np.asarray(parser.ydata), s=0)
|
x = np.asarray(parser.xdata)
|
||||||
|
y = np.asarray(parser.ydata)
|
||||||
|
if np.all(x[:-1] > x[1:]): # all decreasing
|
||||||
|
x = np.flip(x)
|
||||||
|
y = np.flip(y)
|
||||||
|
elif np.any(x[:-1] >= x[1:]): # some not increasing
|
||||||
|
raise ValueError('calib curve %s is not monotonic' % calibspec)
|
||||||
|
try:
|
||||||
|
self.spline = splrep(x, y, s=0, k=min(3, len(x) - 1))
|
||||||
|
except (ValueError, TypeError) as e:
|
||||||
|
raise ValueError('invalid calib curve %s' % calibspec) from e
|
||||||
|
|
||||||
def __call__(self, value):
|
def __call__(self, value):
|
||||||
"""convert value
|
"""convert value
|
||||||
@ -156,7 +176,7 @@ class Sensor(Readable):
|
|||||||
|
|
||||||
calib = Parameter('calibration name', datatype=StringType(), readonly=False)
|
calib = Parameter('calibration name', datatype=StringType(), readonly=False)
|
||||||
abs = Parameter('True: take abs(raw) before calib', datatype=BoolType(), readonly=False, default=True)
|
abs = Parameter('True: take abs(raw) before calib', datatype=BoolType(), readonly=False, default=True)
|
||||||
value = Parameter(unit='K')
|
value = Parameter(datatype=FloatRange(unit='K'))
|
||||||
pollinterval = Parameter(export=False)
|
pollinterval = Parameter(export=False)
|
||||||
status = Parameter(default=(Readable.Status.ERROR, 'unintialized'))
|
status = Parameter(default=(Readable.Status.ERROR, 'unintialized'))
|
||||||
|
|
||||||
@ -164,10 +184,17 @@ class Sensor(Readable):
|
|||||||
_value_error = None
|
_value_error = None
|
||||||
enablePoll = False
|
enablePoll = False
|
||||||
|
|
||||||
|
def checkProperties(self):
|
||||||
|
if 'description' not in self.propertyValues:
|
||||||
|
self.description = '_' # avoid complaining about missing description
|
||||||
|
super().checkProperties()
|
||||||
|
|
||||||
def initModule(self):
|
def initModule(self):
|
||||||
super().initModule()
|
super().initModule()
|
||||||
self._rawsensor.registerCallbacks(self, ['status']) # auto update status
|
self._rawsensor.registerCallbacks(self, ['status']) # auto update status
|
||||||
self._calib = CalCurve(self.calib)
|
self._calib = CalCurve(self.calib)
|
||||||
|
if self.description == '_':
|
||||||
|
self.description = '%r calibrated with curve %r' % (self.rawsensor, self.calib)
|
||||||
|
|
||||||
def write_calib(self, value):
|
def write_calib(self, value):
|
||||||
self._calib = CalCurve(value)
|
self._calib = CalCurve(value)
|
||||||
@ -175,7 +202,7 @@ class Sensor(Readable):
|
|||||||
|
|
||||||
def update_value(self, value):
|
def update_value(self, value):
|
||||||
if self.abs:
|
if self.abs:
|
||||||
value = abs(value)
|
value = abs(float(value))
|
||||||
self.value = self._calib(value)
|
self.value = self._calib(value)
|
||||||
self._value_error = None
|
self._value_error = None
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user