add frappy_psi/calcurve.py and calibtest.py
Change-Id: I5b9aae0ac3bcd76d846c08717201e6c32df4b675
This commit is contained in:
parent
7dfb2ff4e3
commit
f9a0fdf7e4
22
calibtest.py
Normal file
22
calibtest.py
Normal file
@ -0,0 +1,22 @@
|
||||
import sys
|
||||
import os
|
||||
from glob import glob
|
||||
from frappy_psi.calcurve import CalCurve
|
||||
|
||||
os.chdir('/Users/zolliker/gitpsi/calcurves')
|
||||
|
||||
if len(sys.argv) > 1:
|
||||
calib = sys.argv[1]
|
||||
c = CalCurve(calib)
|
||||
else:
|
||||
for file in sorted(glob('*.*')):
|
||||
if file.endswith('.md') or file.endswith('.std'):
|
||||
continue
|
||||
try:
|
||||
c = CalCurve(file)
|
||||
xy = c.export()
|
||||
print('%9.4g %12.7g %9.4g %9.4g %s' % (tuple(c.extx) + tuple(c.exty) + (file,)))
|
||||
except Exception as e:
|
||||
print(file, e)
|
||||
calib = file
|
||||
|
576
frappy_psi/calcurve.py
Normal file
576
frappy_psi/calcurve.py
Normal file
@ -0,0 +1,576 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
# *****************************************************************************
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Markus Zolliker <markus.zolliker@psi.ch>
|
||||
# *****************************************************************************
|
||||
"""Software calibration"""
|
||||
|
||||
import os
|
||||
import re
|
||||
from os.path import basename, dirname, exists, join
|
||||
|
||||
import numpy as np
|
||||
from scipy.interpolate import PchipInterpolator, CubicSpline, PPoly # pylint: disable=import-error
|
||||
|
||||
|
||||
from frappy.errors import ProgrammingError, RangeError
|
||||
from frappy.lib import clamp
|
||||
|
||||
to_scale = {
|
||||
'lin': lambda x: x,
|
||||
'log': lambda x: np.log10(x),
|
||||
}
|
||||
from_scale = {
|
||||
'lin': lambda x: x,
|
||||
'log': lambda x: 10 ** np.array(x),
|
||||
}
|
||||
TYPES = [ # lakeshore type, inp-type, loglog
|
||||
('DT', 'si', False), # Si diode
|
||||
('TG', 'gaalas', False), # GaAlAs diode
|
||||
('PT', 'pt250', False), # platinum, 250 Ohm range
|
||||
('PT', 'pt500', False), # platinum, 500 Ohm range
|
||||
('PT', 'pt2500', False), # platinum, 2500 Ohm range
|
||||
('RF', 'rhfe', False), # rhodium iron
|
||||
('CC', 'c', True), # carbon, LakeShore acronym unknown
|
||||
('CX', 'cernox', True), # Cernox
|
||||
('RX', 'ruox', True), # rutheniumm oxide
|
||||
('GE', 'ge', True), # germanium, LakeShore acronym unknown
|
||||
]
|
||||
|
||||
|
||||
OPTION_TYPE = {
|
||||
'loglog': 0, # boolean
|
||||
'extrange': 2, # tuple(min T, max T for extrapolation
|
||||
'calibrange': 2, # tuple(min T, max T)
|
||||
}
|
||||
|
||||
|
||||
class HasOptions:
|
||||
def insert_option(self, key, value):
|
||||
key = key.strip()
|
||||
argtype = OPTION_TYPE.get(key, 1)
|
||||
if argtype == 1: # one number or string
|
||||
try:
|
||||
value = float(value)
|
||||
except ValueError:
|
||||
value = value.strip()
|
||||
elif argtype == 0:
|
||||
if value.strip().lower() in ('false', '0'):
|
||||
value = False
|
||||
else:
|
||||
value = True
|
||||
else:
|
||||
value = [float(f) for f in value.split(',')]
|
||||
self.options[key] = value
|
||||
|
||||
|
||||
class StdParser(HasOptions):
|
||||
"""parser used for reading columns"""
|
||||
def __init__(self, **options):
|
||||
"""keys of options may be either 'x' or 'logx' and either 'y' or 'logy'
|
||||
|
||||
default is x=0, y=1
|
||||
"""
|
||||
if 'logx' in options:
|
||||
self.xscale = 'log'
|
||||
self.xcol = options.pop('logx')
|
||||
else:
|
||||
self.xscale = 'lin'
|
||||
self.xcol = options.pop('x', 0)
|
||||
if 'logy' in options:
|
||||
self.yscale = 'log'
|
||||
self.ycol = options.pop('logy')
|
||||
else:
|
||||
self.yscale = 'lin'
|
||||
self.ycol = options.pop('y', 1)
|
||||
self.xdata, self.ydata = [], []
|
||||
self.options = options
|
||||
self.invalid_lines = []
|
||||
|
||||
def parse(self, line):
|
||||
"""get numbers from a line and put them to self.xdata / self.ydata"""
|
||||
row = line.split()
|
||||
try:
|
||||
self.xdata.append(float(row[self.xcol]))
|
||||
self.ydata.append(float(row[self.ycol]))
|
||||
except (IndexError, ValueError):
|
||||
self.invalid_lines.append(line)
|
||||
return
|
||||
|
||||
def finish(self):
|
||||
pass
|
||||
|
||||
|
||||
class InpParser(StdParser):
|
||||
"""M. Zollikers *.inp calcurve format"""
|
||||
HEADERLINE = re.compile(r'#?(?:(\w+)\s*=\s*([^!# \t\n]*)|curv.*)')
|
||||
INP_TYPES = {ityp: (ltyp, loglog) for ltyp, ityp, loglog in TYPES}
|
||||
|
||||
def __init__(self, **options):
|
||||
options.update(x=0, y=1)
|
||||
super().__init__(**options)
|
||||
self.header = True
|
||||
|
||||
def parse(self, line):
|
||||
"""scan header"""
|
||||
if self.header:
|
||||
match = self.HEADERLINE.match(line)
|
||||
if match:
|
||||
key, value = match.groups()
|
||||
if key is None:
|
||||
self.header = False
|
||||
else:
|
||||
key = key.lower()
|
||||
value = value.strip()
|
||||
if key == 'type':
|
||||
type_, loglog = self.INP_TYPES.get(value.lower(), (None, None))
|
||||
if type_ is not None:
|
||||
self.options['type'] = type_
|
||||
if loglog is not None:
|
||||
self.options['loglog'] = loglog
|
||||
else:
|
||||
self.insert_option(key, value)
|
||||
return
|
||||
elif line.startswith('!'):
|
||||
return
|
||||
super().parse(line)
|
||||
|
||||
|
||||
class Parser340(StdParser):
|
||||
"""parser for LakeShore *.340 files"""
|
||||
HEADERLINE = re.compile(r'([^:]*):\s*([^(]*)')
|
||||
CALIBHIGH = dict(L=325, M=420, H=500, B=40)
|
||||
|
||||
def __init__(self, **options):
|
||||
options.update(x=1, y=2)
|
||||
super().__init__(**options)
|
||||
self.header = True
|
||||
|
||||
def parse(self, line):
|
||||
"""scan header"""
|
||||
if self.header:
|
||||
match = self.HEADERLINE.match(line)
|
||||
if match:
|
||||
key, value = match.groups()
|
||||
key = ''.join(key.split()).lower()
|
||||
value = value.strip()
|
||||
if key == 'dataformat':
|
||||
if value[0:1] == '4':
|
||||
self.xscale, self.yscale = 'log', 'lin' # logOhm
|
||||
self.options['loglog'] = True
|
||||
elif value[0:1] == '5':
|
||||
self.xscale, self.yscale = 'log', 'log' # logOhm, logK
|
||||
self.options['loglog'] = True
|
||||
elif value[0:1] in ('1', '2', '3'):
|
||||
self.options['loglog'] = False
|
||||
else:
|
||||
raise ValueError('invalid Data Format')
|
||||
self.options['scale'] = self.xscale + self.yscale
|
||||
self.insert_option(key, value)
|
||||
return
|
||||
if 'No.' in line:
|
||||
self.header = False
|
||||
return
|
||||
if len(line.split()) != 3:
|
||||
return
|
||||
super().parse(line)
|
||||
if self.header and self.xdata:
|
||||
# valid line detected
|
||||
self.header = False
|
||||
|
||||
def finish(self):
|
||||
model = self.options.get('sensormodel', '').split('-')
|
||||
if model[0]:
|
||||
self.options['type'] = model[0]
|
||||
if 'calibrange' not in self.options:
|
||||
if len(model) > 2:
|
||||
try: # e.g. model[-1] == 1.4M -> calibrange = 1.4, 420
|
||||
self.options['calibrange'] = float(model[-1][:-1]), self.CALIBHIGH[model[-1][-1]]
|
||||
return
|
||||
except (ValueError, KeyError):
|
||||
pass
|
||||
|
||||
|
||||
class CaldatParser(StdParser):
|
||||
"""parser for format from sea/tcl/startup/calib_ext.tcl"""
|
||||
|
||||
def __init__(self, options):
|
||||
options.update(x=1, y=2)
|
||||
super().__init__(options)
|
||||
|
||||
|
||||
PARSERS = {
|
||||
"340": Parser340,
|
||||
"inp": InpParser,
|
||||
"caldat": CaldatParser,
|
||||
"dat": StdParser, # lakeshore raw data *.dat format
|
||||
}
|
||||
|
||||
|
||||
def check(x, y, islog):
|
||||
# check interpolation error
|
||||
yi = y[:-2] + (x[1:-1] - x[:-2]) * (y[2:] - y[:-2]) / (x[2:] - x[:-2])
|
||||
if islog:
|
||||
return sum((yi - y[1:-1]) ** 2)
|
||||
return sum((np.log10(yi) - np.log10(y[1:-1])) ** 2)
|
||||
|
||||
|
||||
def get_curve(newscale, curves):
|
||||
"""get curve from curve cache (converts not existing ones)
|
||||
|
||||
:param newscale: the new scale to get
|
||||
:param curves: a dict <scale> of <array> storing available scales
|
||||
:return: the retrieved or converted curve
|
||||
"""
|
||||
if newscale in curves:
|
||||
return curves[newscale]
|
||||
for scale, array in curves.items():
|
||||
curves[newscale] = curve = to_scale[newscale](from_scale[scale](array))
|
||||
return curve
|
||||
|
||||
|
||||
class CalCurve(HasOptions):
|
||||
EXTRAPOLATION_AMOUNT = 0.1
|
||||
MAX_EXTRAPOLATION_FACTOR = 2
|
||||
|
||||
def __init__(self, calibspec=None, *, x=None, y=None, cubic_spline=True, **options):
|
||||
"""calibration curve
|
||||
|
||||
:param calibspec: a string with name or filename, options
|
||||
lookup path for files in env. variable FRAPPY_CALIB_PATH
|
||||
calibspec format:
|
||||
[<full path> | <name>][,<key>=<value> ...]
|
||||
for <key>/<value> as in parser arguments
|
||||
:param x, y: x and y arrays (given instead of calibspec)
|
||||
:param cubic_split: set to False for always using Pchip interpolation
|
||||
:param options: options for parsers
|
||||
"""
|
||||
self.options = options
|
||||
if calibspec is None:
|
||||
parser = StdParser()
|
||||
parser.xdata = x
|
||||
parser.ydata = y
|
||||
else:
|
||||
if x or y:
|
||||
raise ProgrammingError('can not give both calibspec and x,y ')
|
||||
sensopt = calibspec.split(',')
|
||||
calibname = sensopt.pop(0)
|
||||
_, dot, ext = basename(calibname).rpartition('.')
|
||||
kind = None
|
||||
pathlist = os.environ.get('FRAPPY_CALIB_PATH', '').split(':')
|
||||
pathlist.append(join(dirname(__file__), 'calcurves'))
|
||||
for path in pathlist:
|
||||
# first try without adding kind
|
||||
filename = join(path.strip(), calibname)
|
||||
if exists(filename):
|
||||
kind = ext if dot else None
|
||||
break
|
||||
# then try adding all kinds as extension
|
||||
for nam in calibname, calibname.upper(), calibname.lower():
|
||||
for kind in PARSERS:
|
||||
filename = join(path.strip(), '%s.%s' % (nam, kind))
|
||||
if exists(filename):
|
||||
break
|
||||
else:
|
||||
continue
|
||||
break
|
||||
else:
|
||||
continue
|
||||
break
|
||||
else:
|
||||
raise FileNotFoundError(calibname)
|
||||
sensopt = iter(sensopt)
|
||||
for opt in sensopt:
|
||||
key, _, value = opt.lower().partition('=')
|
||||
if OPTION_TYPE.get(key) == 2:
|
||||
self.options[key] = float(value), float(next(sensopt))
|
||||
else:
|
||||
self.insert_option(key, value)
|
||||
kind = self.options.pop('kind', kind)
|
||||
cls = PARSERS.get(kind, StdParser)
|
||||
try:
|
||||
parser = cls(**self.options)
|
||||
with open(filename, encoding='utf-8') as f:
|
||||
for line in f:
|
||||
parser.parse(line)
|
||||
parser.finish()
|
||||
except Exception as e:
|
||||
raise ValueError('error parsing calib curve %s %r' % (calibspec, e)) from e
|
||||
# take defaults from parser options
|
||||
self.options = dict(parser.options, **self.options)
|
||||
|
||||
x = np.asarray(parser.xdata)
|
||||
y = np.asarray(parser.ydata)
|
||||
if len(x) < 2:
|
||||
raise ValueError('calib file %s has less than 2 points' % calibspec)
|
||||
|
||||
if x[0] > x[-1]:
|
||||
x = np.flip(np.array(x))
|
||||
y = np.flip(np.array(y))
|
||||
else:
|
||||
x = np.array(x)
|
||||
y = np.array(y)
|
||||
not_incr_idx = np.argwhere(x[1:] <= x[:-1])
|
||||
if len(not_incr_idx):
|
||||
raise RangeError('x not monotonic at x=%.4g' % x[not_incr_idx[0]])
|
||||
|
||||
self.x = {parser.xscale: x}
|
||||
self.y = {parser.yscale: y}
|
||||
self.lin_forced = [parser.yscale == 'lin' and (y[0] <= 0 or y[-1] <= 0),
|
||||
parser.xscale == 'lin' and x[0] <= 0]
|
||||
if sum(self.lin_forced):
|
||||
self.loglog = False
|
||||
else:
|
||||
self.loglog = self.options.get('loglog', y[0] > y[-1]) # loglog defaults to True for NTC
|
||||
newscale = 'log' if self.loglog else 'lin'
|
||||
self.scale = newscale
|
||||
x = get_curve(newscale, self.x)
|
||||
y = get_curve(newscale, self.y)
|
||||
self.convert_x = to_scale[newscale]
|
||||
self.convert_y = from_scale[newscale]
|
||||
self.calibrange = self.options.get('calibrange')
|
||||
dirty = set()
|
||||
self.extra_points = False
|
||||
self.cutted = False
|
||||
if self.calibrange:
|
||||
self.calibrange = sorted(self.calibrange)
|
||||
# determine indices (ibeg, iend) of first and last well calibrated point
|
||||
ylin = get_curve('lin', self.y)
|
||||
beg, end = self.calibrange
|
||||
if y[0] > y[-1]:
|
||||
ylin = -ylin
|
||||
beg, end = -end, -beg
|
||||
|
||||
ibeg, iend = np.searchsorted(ylin, (beg, end))
|
||||
if ibeg > 0 and abs(ylin[ibeg-1] - beg) < 0.1 * (ylin[ibeg] - ylin[ibeg-1]):
|
||||
# add previous point, if close
|
||||
ibeg -= 1
|
||||
if iend < len(ylin) and abs(ylin[iend] - end) < 0.1 * (ylin[iend] - ylin[iend-1]):
|
||||
# add next point, if close
|
||||
iend += 1
|
||||
if self.options.get('cut', False):
|
||||
self.cutted = True
|
||||
x = x[ibeg:iend]
|
||||
y = y[ibeg:iend]
|
||||
self.x = {newscale: x}
|
||||
self.y = {newscale: y}
|
||||
ibeg = 0
|
||||
iend = len(x)
|
||||
dirty.add('xy')
|
||||
else:
|
||||
self.extra_points = ibeg, len(x) - iend
|
||||
else:
|
||||
ibeg = 0
|
||||
iend = len(x)
|
||||
ylin = get_curve('lin', self.y)
|
||||
self.calibrange = tuple(sorted([ylin[0], ylin[-1]]))
|
||||
|
||||
if cubic_spline:
|
||||
# fit well calibrated part with spline
|
||||
# determine slopes of calibrated part with CubicSpline
|
||||
spline = CubicSpline(x[ibeg:iend], y[ibeg:iend])
|
||||
roots = spline.derivative().roots(extrapolate=False)
|
||||
if len(roots):
|
||||
cubic_spline = False
|
||||
|
||||
self.cubic_spline = cubic_spline
|
||||
if cubic_spline:
|
||||
coeff = spline.c
|
||||
if self.extra_points:
|
||||
p = PchipInterpolator(x, y).c
|
||||
# use Pchip outside left and right of calibrange
|
||||
# remark: first derivative at end of calibrange is not continuous
|
||||
coeff = np.concatenate((p[:, :ibeg], coeff, p[:, iend-1:]), axis=1)
|
||||
else:
|
||||
spline = PchipInterpolator(x, y)
|
||||
coeff = spline.c
|
||||
# extrapolation extension
|
||||
# linear extrapolation is more robust than spline extrapolation
|
||||
x1, x2 = x[0], x[-1]
|
||||
# take slope at end of calibrated range for extrapolation
|
||||
slopes = spline([x[ibeg], x[iend-1]], 1)
|
||||
for i, j in enumerate([ibeg, iend-2]):
|
||||
# slope of last interval in calibrange
|
||||
si = (y[j+1] - y[j])/(x[j+1] - x[j])
|
||||
# make sure slope is not more than a factor 2 different
|
||||
# from the slope calculated at the outermost calibrated intervals
|
||||
slopes[i] = clamp(slopes[i], 2*si, 0.5 * si)
|
||||
dx = 0.1 if self.loglog else (x2 - x1) * 0.1
|
||||
xe = np.concatenate(([x1 - dx], x, [x2 + dx]))
|
||||
# x3 = np.append(x, x2 + dx)
|
||||
# y3 = np.append(y, y[-1] + slope * dx)
|
||||
y0 = y[0] - slopes[0] * dx
|
||||
coeff = np.concatenate(([[0], [0], [slopes[0]], [y0]], coeff, [[0], [0], [slopes[1]], [y[-1]]]), axis=1)
|
||||
self.spline = PPoly(coeff, xe)
|
||||
# ranges without extrapolation:
|
||||
self.xrange = get_curve('lin', self.x)[[0, -1]]
|
||||
self.yrange = sorted(get_curve('lin', self.y)[[0, -1]])
|
||||
self.calibrange = [max(self.calibrange[0], self.yrange[0]),
|
||||
min(self.calibrange[1], self.yrange[1])]
|
||||
self.set_extrapolation()
|
||||
|
||||
# check
|
||||
# ys = self.spline(xe)
|
||||
# ye = np.concatenate(([y0], y, [y[-1] + slope2 * dx]))
|
||||
# assert np.all(np.abs(ys - ye) < 1e-5 * (0.1 + np.abs(ys + ye)))
|
||||
|
||||
def set_extrapolation(self, extleft=None, extright=None):
|
||||
"""set default extrapolation range for export method
|
||||
|
||||
:param extleft: y value for the lower end of the extrapolation
|
||||
:param extright: y value for the upper end of the extrapolation
|
||||
|
||||
if arguments omitted or None are replaced by a default extrapolation scheme
|
||||
|
||||
on return self.extx and self.exty are set to the extrapolated ranges
|
||||
"""
|
||||
yc1, yc2 = self.calibrange
|
||||
y1, y2 = to_scale[self.scale]([yc1, yc2])
|
||||
d = (y2 - y1) * self.EXTRAPOLATION_AMOUNT
|
||||
yex1, yex2 = tuple(from_scale[self.scale]([y1 - d, y2 + d]))
|
||||
t1, t2 = tuple(from_scale[self.scale]([y1, y2]))
|
||||
|
||||
# raw units, excluding extrapolation points at end
|
||||
xrng = self.spline.x[1], self.spline.x[-2]
|
||||
# first and last point
|
||||
yp1, yp2 = sorted(from_scale[self.scale](self.spline(xrng)))
|
||||
xrng = from_scale[self.scale](xrng)
|
||||
|
||||
# limit by maximal factor
|
||||
f = self.MAX_EXTRAPOLATION_FACTOR
|
||||
# but ext range should be at least to the points in curve
|
||||
self.exty = [min(yp1, max(yex1, min(t1 / f, t1 * f))),
|
||||
max(yp2, min(yex2, max(t2 * f, t2 / f)))]
|
||||
if extleft is not None:
|
||||
self.exty[0] = min(extleft, yp1)
|
||||
if extright is not None:
|
||||
self.exty[1] = max(extright, yp2)
|
||||
self.extx = sorted(self.invert(*yd) for yd in zip(self.exty, xrng))
|
||||
# check that sensor range is not extended by more than a factor f
|
||||
extnew = [max(self.extx[0], min(xrng[0] / f, xrng[0] * f)),
|
||||
min(self.extx[1], max(xrng[1] / f, xrng[1] * f))]
|
||||
if extnew != self.extx:
|
||||
# need further reduction
|
||||
self.extx = extnew
|
||||
self.exty = sorted(self(extnew))
|
||||
|
||||
def convert(self, value):
|
||||
"""convert a single value
|
||||
|
||||
return a tuple (converted value, boolean: was it clamped?)
|
||||
"""
|
||||
x = clamp(value, *self.extx)
|
||||
return self(x), x == value
|
||||
|
||||
def __call__(self, value):
|
||||
"""convert value or numpy array without checking extrapolation range"""
|
||||
return self.convert_y(self.spline(self.convert_x(value)))
|
||||
|
||||
def invert(self, y, defaultx=None, xscale=True, yscale=True):
|
||||
"""invert y, return defaultx if no solution is found"""
|
||||
if yscale:
|
||||
y = to_scale[self.scale](y)
|
||||
r = self.spline.solve(y)
|
||||
try:
|
||||
if xscale:
|
||||
return from_scale[self.scale](r[0])
|
||||
return r[0]
|
||||
except IndexError:
|
||||
return defaultx
|
||||
|
||||
def export(self, logformat=False, nmax=199, yrange=None, extrapolate=True, xlimits=None):
|
||||
"""export curve for downloading to hardware
|
||||
|
||||
:param nmax: max number of points. if the number of given points is bigger,
|
||||
the points with the lowest interpolation error are omitted
|
||||
:param logformat: a list with two elements of None, True or False
|
||||
True: use log, False: use line, None: use log if self.loglog
|
||||
values None are replaced with the effectively used format
|
||||
False / True are replaced by [False, False] / [True, True]
|
||||
default is False
|
||||
:param yrange: to reduce or extrapolate to this interval (extrapolate is ignored when given)
|
||||
:param extrapolate: a flag indicating whether the curves should be extrapolated
|
||||
to the preset extrapolation range
|
||||
:param xlimits: max x range
|
||||
:return: numpy array with 2 dimensions returning the curve
|
||||
"""
|
||||
|
||||
if logformat in (True, False):
|
||||
logformat = [logformat, logformat]
|
||||
try:
|
||||
scales = []
|
||||
for idx, logfmt in enumerate(logformat):
|
||||
if logfmt and self.lin_forced[idx]:
|
||||
raise ValueError('%s must contain positive values only' % 'xy'[idx])
|
||||
logformat[idx] = linlog = self.loglog if logfmt is None else logfmt
|
||||
scales.append('log' if linlog else 'lin')
|
||||
xscale, yscale = scales
|
||||
except (TypeError, AssertionError):
|
||||
raise ValueError('logformat must be a 2 element list or a boolean')
|
||||
|
||||
x = self.spline.x[1:-1] # raw units, excluding extrapolated points
|
||||
x1, x2 = xmin, xmax = x[0], x[-1]
|
||||
y1, y2 = sorted(self.spline([x1, x2]))
|
||||
|
||||
if extrapolate and not yrange:
|
||||
yrange = self.exty
|
||||
if yrange is not None:
|
||||
xmin, xmax = sorted(self.invert(*yd, xscale=False) for yd in zip(yrange, [x1, x2]))
|
||||
if xlimits is not None:
|
||||
lim = to_scale[self.scale](xlimits)
|
||||
xmin = clamp(xmin, *lim)
|
||||
xmax = clamp(xmax, *lim)
|
||||
if xmin != x1 or xmax != x2:
|
||||
ibeg, iend = np.searchsorted(x, (xmin, xmax))
|
||||
if abs(x[ibeg] - xmin) < 0.1 * (x[ibeg + 1] - x[ibeg]):
|
||||
# remove first point, if close
|
||||
ibeg += 1
|
||||
if abs(x[iend - 1] - xmax) < 0.1 * (x[iend - 1] - x[iend - 2]):
|
||||
# remove last point, if close
|
||||
iend -= 1
|
||||
x = np.concatenate(([xmin], x[ibeg:iend], [xmax]))
|
||||
y = self.spline(x)
|
||||
|
||||
# convert to exported scale
|
||||
if xscale != self.scale:
|
||||
x = to_scale[xscale](from_scale[self.scale](x))
|
||||
if yscale != self.scale:
|
||||
y = to_scale[yscale](from_scale[self.scale](y))
|
||||
|
||||
# reduce number of points, if needed
|
||||
n = len(x)
|
||||
i, j = 1, n - 1 # index range for calculating interpolation deviation
|
||||
deviation = np.zeros(n)
|
||||
while True:
|
||||
# calculate interpolation error when a single point is omitted
|
||||
ym = y[i-1:j-1] + (x[i:j] - x[i-1:j-1]) * (y[i+1:j+1] - y[i-1:j-1]) / (x[i+1:j+1] - x[i-1:j-1])
|
||||
if yscale == 'log':
|
||||
deviation[i:j] = np.abs(ym - y[i:j])
|
||||
else:
|
||||
deviation[i:j] = np.abs(ym - y[i:j]) / (np.abs(ym + y[i:j]) + 1e-10)
|
||||
if n <= nmax:
|
||||
break
|
||||
idx = np.argmin(deviation[1:-1]) + 1 # find index of the smallest error
|
||||
y = np.delete(y, idx)
|
||||
x = np.delete(x, idx)
|
||||
deviation = np.delete(deviation, idx)
|
||||
n -= 1
|
||||
# index range to recalculate
|
||||
i, j = max(1, idx - 1), min(n - 1, idx + 1)
|
||||
self.deviation = deviation # for debugging purposes
|
||||
return np.stack([x, y], axis=1)
|
Loading…
x
Reference in New Issue
Block a user