frappy/frappy_psi/calcurve.py
2025-03-07 07:37:11 +01:00

678 lines
26 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""Software calibration"""
import os
import re
from pathlib import Path
from os.path import basename, dirname, exists, join
import numpy as np
from scipy.interpolate import PchipInterpolator, CubicSpline, PPoly # pylint: disable=import-error
from frappy.errors import ProgrammingError, RangeError
from frappy.lib import clamp
def identity(x):
return x
def exp10(x):
return 10 ** np.array(x)
to_scale = {
'lin': identity,
'log': np.log10,
}
from_scale = {
'lin': identity,
'log': exp10,
}
TYPES = [ # lakeshore type, inp-type, loglog
('DT', 'si', False), # Si diode
('TG', 'gaalas', False), # GaAlAs diode
('PT', 'pt250', False), # platinum, 250 Ohm range
('PT', 'pt500', False), # platinum, 500 Ohm range
('PT', 'pt2500', False), # platinum, 2500 Ohm range
('RF', 'rhfe', False), # rhodium iron
('CC', 'c', True), # carbon, LakeShore acronym unknown
('CX', 'cernox', True), # Cernox
('RX', 'ruox', True), # rutheniumm oxide
('GE', 'ge', True), # germanium, LakeShore acronym unknown
]
OPTION_TYPE = {
'loglog': 0, # boolean
'extrange': 2, # tuple(min T, max T) for extrapolation
'calibrange': 2, # tuple(min T, max T)
}
class HasOptions:
def insert_option(self, key, value):
key = key.strip()
argtype = OPTION_TYPE.get(key, 1)
if argtype == 1: # one number or string
try:
value = float(value)
except ValueError:
value = value.strip()
elif argtype == 0:
if value.strip().lower() in ('false', '0'):
value = False
else:
value = True
else:
value = [float(f) for f in value.split(',')]
self.options[key] = value
class StdParser(HasOptions):
"""parser used for reading columns"""
def __init__(self, **options):
"""keys of options may be either 'x' or 'logx' and either 'y' or 'logy'
default is x=0, y=1
"""
if 'logx' in options:
self.xscale = 'log'
self.xcol = options.pop('logx')
else:
self.xscale = 'lin'
self.xcol = options.pop('x', 0)
if 'logy' in options:
self.yscale = 'log'
self.ycol = options.pop('logy')
else:
self.yscale = 'lin'
self.ycol = options.pop('y', 1)
self.xdata, self.ydata = [], []
self.options = options
self.invalid_lines = []
def parse(self, line):
"""get numbers from a line and put them to self.xdata / self.ydata"""
row = line.split()
try:
self.xdata.append(float(row[self.xcol]))
self.ydata.append(float(row[self.ycol]))
except (IndexError, ValueError):
self.invalid_lines.append(line)
return
def finish(self):
pass
class InpParser(StdParser):
"""M. Zollikers *.inp calcurve format"""
HEADERLINE = re.compile(r'#?(?:(\w+)\s*=\s*([^!# \t\n]*)|curv.*)')
INP_TYPES = {ityp: (ltyp, loglog) for ltyp, ityp, loglog in TYPES}
def __init__(self, **options):
options.update(x=0, y=1)
super().__init__(**options)
self.header = True
def parse(self, line):
"""scan header"""
if self.header:
match = self.HEADERLINE.match(line)
if match:
key, value = match.groups()
if key is None:
self.header = False
else:
key = key.lower()
value = value.strip()
if key == 'type':
type_, loglog = self.INP_TYPES.get(value.lower(), (None, None))
if type_ is not None:
self.options['type'] = type_
if loglog is not None:
self.options['loglog'] = loglog
else:
self.insert_option(key, value)
return
elif line.startswith('!'):
return
super().parse(line)
class Parser340(StdParser):
"""parser for LakeShore *.340 files"""
HEADERLINE = re.compile(r'([^:]*):\s*([^(]*)')
CALIBHIGH = dict(L=325, M=420, H=500, B=40)
def __init__(self, **options):
options.update(x=1, y=2)
super().__init__(**options)
self.header = True
def parse(self, line):
"""scan header"""
if self.header:
match = self.HEADERLINE.match(line)
if match:
key, value = match.groups()
key = ''.join(key.split()).lower()
value = value.strip()
if key == 'dataformat':
if value[0:1] == '4':
self.xscale, self.yscale = 'log', 'lin' # logOhm
self.options['loglog'] = True
elif value[0:1] == '5':
self.xscale, self.yscale = 'log', 'log' # logOhm, logK
self.options['loglog'] = True
elif value[0:1] in ('1', '2', '3'):
self.options['loglog'] = False
else:
raise ValueError('invalid Data Format')
self.options['scale'] = self.xscale + self.yscale
self.insert_option(key, value)
return
if 'No.' in line:
self.header = False
return
if len(line.split()) != 3:
return
super().parse(line)
if self.header and self.xdata:
# valid line detected
self.header = False
def finish(self):
model = self.options.get('sensormodel', '').split('-')
if model[0]:
self.options['type'] = model[0]
if 'calibrange' not in self.options:
if len(model) > 2:
try: # e.g. model[-1] == 1.4M -> calibrange = 1.4, 420
self.options['calibrange'] = float(model[-1][:-1]), self.CALIBHIGH[model[-1][-1]]
return
except (ValueError, KeyError):
pass
class CaldatParser(StdParser):
"""parser for format from sea/tcl/startup/calib_ext.tcl"""
def __init__(self, options):
options.update(x=1, y=2)
super().__init__(options)
PARSERS = {
"340": Parser340,
"inp": InpParser,
"caldat": CaldatParser,
"dat": StdParser, # lakeshore raw data *.dat format
}
def get_curve(newscale, curves):
"""get curve from curve cache (converts not existing ones)
:param newscale: the new scale to get
:param curves: a dict <scale> of <array> storing available scales
:return: the retrieved or converted curve
"""
if newscale in curves:
return curves[newscale]
for scale, array in curves.items():
curves[newscale] = curve = to_scale[newscale](from_scale[scale](array))
return curve
class CalCurve(HasOptions):
EXTRAPOLATION_AMOUNT = 0.1
MAX_EXTRAPOLATION_FACTOR = 2
filename = None # calibration file
def __init__(self, calibspec=None, *, x=None, y=None, cubic_spline=True, **options):
"""calibration curve
:param calibspec: a string with name or filename, options
lookup path for files in env. variable FRAPPY_CALIB_PATH
calibspec format:
[<full path> | <name>][,<key>=<value> ...]
for <key>/<value> as in parser arguments
:param x, y: x and y arrays (given instead of calibspec)
:param cubic_spline: set to False for always using Pchip interpolation
:param options: options for parsers
"""
self.options = options
if calibspec is None:
parser = StdParser()
parser.xdata = x
parser.ydata = y
self.calibname = 'custom'
else:
if x or y:
raise ProgrammingError('can not give both calibspec and x,y ')
sensopt = calibspec.split(',')
calibname = sensopt.pop(0)
self.calibname = basename(calibname)
head, dot, ext = self.calibname.rpartition('.')
if dot:
self.calibname = head
kind = None
pathlist = [Path(p.strip()) for p in os.environ.get('FRAPPY_CALIB_PATH', '').split(':')]
pathlist.append(Path(dirname(__file__)) / 'calcurves')
for path in pathlist:
# first try without adding kind
filename = path / calibname
if filename.exists():
kind = ext if dot else None
break
# then try adding all kinds as extension
for nam in calibname, calibname.upper(), calibname.lower():
for kind in PARSERS:
filename = path / f'{nam}.{kind}'
if exists(filename):
self.filename = filename
break
else:
continue
break
else:
continue
break
else:
raise FileNotFoundError(calibname)
sensopt = iter(sensopt)
for opt in sensopt:
key, _, value = opt.lower().partition('=')
if OPTION_TYPE.get(key) == 2:
self.options[key] = float(value), float(next(sensopt))
else:
self.insert_option(key, value)
kind = self.options.pop('kind', kind)
cls = PARSERS.get(kind, StdParser)
try:
parser = cls(**self.options)
with open(filename, encoding='utf-8') as f:
for line in f:
parser.parse(line)
parser.finish()
except Exception as e:
raise ValueError('error parsing calib curve %s %r' % (calibspec, e)) from e
# take defaults from parser options
self.options = dict(parser.options, **self.options)
x = np.asarray(parser.xdata)
y = np.asarray(parser.ydata)
if len(x) < 2:
raise ValueError('calib file %s has less than 2 points' % calibspec)
if x[0] > x[-1]:
x = np.flip(np.array(x))
y = np.flip(np.array(y))
else:
x = np.array(x)
y = np.array(y)
not_incr_idx = np.argwhere(x[1:] <= x[:-1])
if len(not_incr_idx):
raise RangeError('x not monotonic at x=%.4g' % x[not_incr_idx[0]])
self.ptc = y[-1] > y[0]
self.x = {parser.xscale: x}
self.y = {parser.yscale: y}
self.lin_forced = [parser.yscale == 'lin' and (y[0] <= 0 or y[-1] <= 0),
parser.xscale == 'lin' and x[0] <= 0]
if sum(self.lin_forced):
self.loglog = False
else:
self.loglog = self.options.get('loglog', y[0] > y[-1]) # loglog defaults to True for NTC
newscale = 'log' if self.loglog else 'lin'
self.scale = newscale
x = get_curve(newscale, self.x)
y = get_curve(newscale, self.y)
self.convert_x = to_scale[newscale]
self.convert_y = from_scale[newscale]
self.calibrange = self.options.get('calibrange')
self.extra_points = (0, 0)
self.cutted = False
if self.calibrange:
self.calibrange = sorted(self.calibrange)
# determine indices (ibeg, iend) of first and last well calibrated point
ylin = get_curve('lin', self.y)
beg, end = self.calibrange
if y[0] > y[-1]:
ylin = -ylin
beg, end = -end, -beg
ibeg, iend = np.searchsorted(ylin, (beg, end))
if ibeg > 0 and abs(ylin[ibeg-1] - beg) < 0.1 * (ylin[ibeg] - ylin[ibeg-1]):
# add previous point, if close
ibeg -= 1
if iend < len(ylin) and abs(ylin[iend] - end) < 0.1 * (ylin[iend] - ylin[iend-1]):
# add next point, if close
iend += 1
if self.options.get('cut', False):
self.cutted = True
x = x[ibeg:iend]
y = y[ibeg:iend]
self.x = {newscale: x}
self.y = {newscale: y}
ibeg = 0
iend = len(x)
else:
self.extra_points = ibeg, len(x) - iend
else:
ibeg = 0
iend = len(x)
ylin = get_curve('lin', self.y)
self.calibrange = tuple(sorted([ylin[0], ylin[-1]]))
if cubic_spline:
# fit well calibrated part with spline
# determine slopes of calibrated part with CubicSpline
spline = CubicSpline(x[ibeg:iend], y[ibeg:iend])
roots = spline.derivative().roots(extrapolate=False)
if len(roots):
cubic_spline = False
self.cubic_spline = cubic_spline
if cubic_spline:
coeff = spline.c
if self.extra_points:
p = PchipInterpolator(x, y).c
# use Pchip outside left and right of calibrange
# remark: first derivative at end of calibrange is not continuous
coeff = np.concatenate((p[:, :ibeg], coeff, p[:, iend-1:]), axis=1)
else:
spline = PchipInterpolator(x, y)
coeff = spline.c
# extrapolation extension
# linear extrapolation is more robust than spline extrapolation
x1, x2 = x[0], x[-1]
# take slope at end of calibrated range for extrapolation
slopes = spline([x[ibeg], x[iend-1]], 1)
for i, j in enumerate([ibeg, iend-2]):
# slope of last interval in calibrange
si = (y[j+1] - y[j])/(x[j+1] - x[j])
# make sure slope is not more than a factor 2 different
# from the slope calculated at the outermost calibrated intervals
slopes[i] = clamp(slopes[i], 2*si, 0.5 * si)
dx = 0.1 if self.loglog else (x2 - x1) * 0.1
xe = np.concatenate(([x1 - dx], x, [x2 + dx]))
# x3 = np.append(x, x2 + dx)
# y3 = np.append(y, y[-1] + slope * dx)
y0 = y[0] - slopes[0] * dx
coeff = np.concatenate(([[0], [0], [slopes[0]], [y0]], coeff, [[0], [0], [slopes[1]], [y[-1]]]), axis=1)
self.spline = PPoly(coeff, xe)
# ranges without extrapolation:
self.xrange = get_curve('lin', self.x)[[0, -1]]
self.yrange = sorted(get_curve('lin', self.y)[[0, -1]])
self.calibrange = [max(self.calibrange[0], self.yrange[0]),
min(self.calibrange[1], self.yrange[1])]
self.set_extrapolation()
# check
# ys = self.spline(xe)
# ye = np.concatenate(([y0], y, [y[-1] + slope2 * dx]))
# assert np.all(np.abs(ys - ye) < 1e-5 * (0.1 + np.abs(ys + ye)))
def set_extrapolation(self, extleft=None, extright=None):
"""set default extrapolation range for export method
:param extleft: y value for the lower end of the extrapolation
:param extright: y value for the upper end of the extrapolation
if arguments omitted or None are replaced by a default extrapolation scheme
on return self.extx and self.exty are set to the extrapolated ranges
"""
yc1, yc2 = self.calibrange
y1, y2 = to_scale[self.scale]([yc1, yc2])
d = (y2 - y1) * self.EXTRAPOLATION_AMOUNT
yex1, yex2 = tuple(from_scale[self.scale]([y1 - d, y2 + d]))
t1, t2 = tuple(from_scale[self.scale]([y1, y2]))
# raw units, excluding extrapolation points at end
xrng = self.spline.x[1], self.spline.x[-2]
# first and last point
yp1, yp2 = sorted(from_scale[self.scale](self.spline(xrng)))
xrng = from_scale[self.scale](xrng)
# limit by maximal factor
f = self.MAX_EXTRAPOLATION_FACTOR
# but ext range should be at least to the points in curve
self.exty = [min(yp1, max(yex1, min(t1 / f, t1 * f))),
max(yp2, min(yex2, max(t2 * f, t2 / f)))]
if extleft is not None:
self.exty[0] = min(extleft, yp1)
if extright is not None:
self.exty[1] = max(extright, yp2)
self.extx = sorted(self.invert(*yd) for yd in zip(self.exty, xrng))
# check that sensor range is not extended by more than a factor f
extnew = [max(self.extx[0], min(xrng[0] / f, xrng[0] * f)),
min(self.extx[1], max(xrng[1] / f, xrng[1] * f))]
if extnew != self.extx:
# need further reduction
self.extx = extnew
self.exty = sorted(self(extnew))
def convert(self, value):
"""convert a single value
return a tuple (converted value, boolean: was it clamped?)
"""
x = clamp(value, *self.extx)
return self(x), x == value
def __call__(self, value):
"""convert value or numpy array without checking extrapolation range"""
return self.convert_y(self.spline(self.convert_x(value)))
def invert(self, y, defaultx=None, xscale=True, yscale=True):
"""invert y, return defaultx if no solution is found"""
if yscale:
y = to_scale[self.scale](y)
r = self.spline.solve(y)
try:
if xscale:
return from_scale[self.scale](r[0])
return r[0]
except IndexError:
return defaultx
def interpolation_error(self, x0, x1, y0, y1, funx, funy, relerror, return_tuple=False):
"""calcualte interpoaltion error
:param x0: start of interval
:param x1: end of interval
:param y0: y at start of interval
:param y1: y at end of interval
:param funx: function to convert x from exported scale to internal scale
:param funy: function to convert y from internal scale to exported scale
:param relerror: True when the exported y scale is linear
:param return_tuple: True: return interpolation error as a tuple with two values
(without and with 3 additional points)
False: return one value without additional points
:return: relative deviation
"""
xspace = np.linspace(x0, x1, 9)
x = funx(xspace)
yr = self.spline(x)
yspline = funy(yr)
yinterp = y0 + np.linspace(0.0, y1 - y0, 9)
# difference between spline (at m points) and liner interpolation
diff = np.abs(yspline - yinterp)
# estimate of interpolation error with 4 sections:
# difference between spline (at m points) and linear interpolation between neighboring points
if relerror:
fact = 2 / (np.abs(y0) + np.abs(y1)) # division by zero can not happen, as y0 and y1 can not both be zero
else:
fact = 2.3 # difference is in log10 -> multiply by 1 / log10(e)
result = np.max(diff, axis=0) * fact
if return_tuple:
diff2 = np.abs(0.5 * (yspline[:-2:2] + yspline[2::2]) - funy(yr[1:-1:2]))
return result, np.max(diff2, axis=0) * fact
return result
def export(self, logformat=False, nmax=199, yrange=None, extrapolate=True, xlimits=None, nmin=199):
"""export curve for downloading to hardware
:param nmax: max number of points. if the number of given points is bigger,
the points with the lowest interpolation error are omitted
:param logformat: a list with two elements of None, True or False for x and y
True: use log, False: use lin, None: use log if self.loglog
values None are replaced with the effectively used format
False / True are replaced by [False, False] / [True, True]
default is False
:param yrange: to reduce or extrapolate to this interval (extrapolate is ignored when given)
:param extrapolate: a flag indicating whether the curves should be extrapolated
to the preset extrapolation range
:param xlimits: max x range
:param nmin: minimum number of points
:return: numpy array with 2 dimensions returning the curve
"""
if logformat in (True, False):
logformat = (logformat, logformat)
self.logformat = list(logformat)
try:
scales = []
for idx, logfmt in enumerate(logformat):
if logfmt and self.lin_forced[idx]:
raise ValueError('%s must contain positive values only' % 'xy'[idx])
self.logformat[idx] = linlog = self.loglog if logfmt is None else logfmt
scales.append('log' if linlog else 'lin')
xscale, yscale = scales
except (TypeError, AssertionError):
raise ValueError('logformat must be a 2 element sequence or a boolean')
xr = self.spline.x[1:-1] # raw units, excluding extrapolated points
x1, x2 = xmin, xmax = xr[0], xr[-1]
if extrapolate and not yrange:
yrange = self.exty
if yrange is not None:
xmin, xmax = sorted(self.invert(*yd, xscale=False) for yd in zip(yrange, [x1, x2]))
if xlimits is not None:
lim = to_scale[self.scale](xlimits)
xmin = clamp(xmin, *lim)
xmax = clamp(xmax, *lim)
# start and end index of calibrated range
ibeg, iend = self.extra_points[0], len(xr) - self.extra_points[1]
if xmin != x1 or xmax != x2:
i, j = np.searchsorted(xr, (xmin, xmax))
if abs(xr[i] - xmin) < 0.1 * (xr[i + 1] - xr[i]):
# remove first point, if close
i += 1
if abs(xr[j - 1] - xmax) < 0.1 * (xr[j - 1] - xr[j - 2]):
# remove last point, if close
j -= 1
offset = i - 1
xr = np.concatenate(([xmin], xr[i:j], [xmax]))
ibeg = max(0, ibeg - offset)
iend = min(len(xr), iend - offset)
yr = self.spline(xr)
# convert to exported scale
if xscale == self.scale:
xbwd = identity
x = xr
else:
if self.scale == 'log':
xfwd, xbwd = from_scale[self.scale], to_scale[self.scale]
else:
xfwd, xbwd = to_scale[xscale], from_scale[xscale]
x = xfwd(xr)
if yscale == self.scale:
yfwd = identity
y = yr
else:
if self.scale == 'log':
yfwd = from_scale[self.scale]
else:
yfwd = to_scale[yscale]
y = yfwd(yr)
self.deviation = None
nmin = min(nmin, nmax)
n = len(x)
relerror = yscale == 'lin'
if len(x) > nmax:
# reduce number of points, if needed
i, j = 1, n - 1 # index range for calculating interpolation deviation
deviation = np.zeros(n)
while True:
deviation[i:j] = self.interpolation_error(
x[i-1:j-1], x[i+1:j+1], y[i-1:j-1], y[i+1:j+1],
xbwd, yfwd, relerror)
# calculate interpolation error when a single point is omitted
if n <= nmax:
break
idx = np.argmin(deviation[1:-1]) + 1 # find index of the smallest error
y = np.delete(y, idx)
x = np.delete(x, idx)
deviation = np.delete(deviation, idx)
n = len(x)
# index range to recalculate
i, j = max(1, idx - 1), min(n - 1, idx + 1)
self.deviation = deviation # for debugging purposes
elif n < nmin:
if ibeg + 1 < iend:
diff1, diff4 = self.interpolation_error(
x[ibeg:iend - 1], x[ibeg + 1:iend], y[ibeg:iend - 1], y[ibeg + 1:iend],
xbwd, yfwd, relerror, return_tuple=True)
dif_target = 1e-4
sq4 = np.sqrt(diff4) * 4
sq1 = np.sqrt(diff1)
offset = 0.49
n_mid = nmax - len(x) + iend - ibeg - 1
# iteration to find a dif target resulting in no more than nmax points
while True:
scale = 1 / np.sqrt(dif_target)
# estimate number of intermediate points (float!) needed to reach dif_target
# number of points estimated from the result of the interpolation error with 4 sections
n4 = np.maximum(1, sq4 * scale)
# number of points estimated from the result of the interpolation error with 1 section
n1 = np.maximum(1, sq1 * scale)
# use n4 where n4 > 4, n1, where n1 < 1 and a weighted average in between
nn = np.select([n4 > 4, n1 > 1],
[n4, (n4 * (n1 - 1) + n1 * (4 - n4)) / (3 + n1 - n4)], n1)
n_tot = np.sum(np.rint(nn + offset))
extra = n_tot - n_mid
if extra <= 0:
break
dif_target *= (n_tot / n_mid) ** 2
xnew = [x[:ibeg]]
for x0, x1, ni in zip(x[ibeg:iend-1], x[ibeg+1:iend], np.rint(nn + offset)):
xnew.append(np.linspace(x0, x1, int(ni) + 1)[:-1])
xnew.append(x[iend-1:])
x = np.concatenate(xnew)
y = yfwd(self.spline(xbwd(x)))
# for debugging purposes:
self.deviation = self.interpolation_error(x[:-1], x[1:], y[:-1], y[1:], xbwd, yfwd, relerror)
return np.stack([x, y], axis=1)