diff --git a/calibtest.py b/calibtest.py new file mode 100644 index 0000000..fd2466d --- /dev/null +++ b/calibtest.py @@ -0,0 +1,22 @@ +import sys +import os +from glob import glob +from frappy_psi.calcurve import CalCurve + +os.chdir('/Users/zolliker/gitpsi/calcurves') + +if len(sys.argv) > 1: + calib = sys.argv[1] + c = CalCurve(calib) +else: + for file in sorted(glob('*.*')): + if file.endswith('.md') or file.endswith('.std'): + continue + try: + c = CalCurve(file) + xy = c.export() + print('%9.4g %12.7g %9.4g %9.4g %s' % (tuple(c.extx) + tuple(c.exty) + (file,))) + except Exception as e: + print(file, e) + calib = file + diff --git a/frappy_psi/calcurve.py b/frappy_psi/calcurve.py new file mode 100644 index 0000000..4083ff3 --- /dev/null +++ b/frappy_psi/calcurve.py @@ -0,0 +1,576 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""Software calibration""" + +import os +import re +from os.path import basename, dirname, exists, join + +import numpy as np +from scipy.interpolate import PchipInterpolator, CubicSpline, PPoly # pylint: disable=import-error + + +from frappy.errors import ProgrammingError, RangeError +from frappy.lib import clamp + +to_scale = { + 'lin': lambda x: x, + 'log': lambda x: np.log10(x), +} +from_scale = { + 'lin': lambda x: x, + 'log': lambda x: 10 ** np.array(x), +} +TYPES = [ # lakeshore type, inp-type, loglog + ('DT', 'si', False), # Si diode + ('TG', 'gaalas', False), # GaAlAs diode + ('PT', 'pt250', False), # platinum, 250 Ohm range + ('PT', 'pt500', False), # platinum, 500 Ohm range + ('PT', 'pt2500', False), # platinum, 2500 Ohm range + ('RF', 'rhfe', False), # rhodium iron + ('CC', 'c', True), # carbon, LakeShore acronym unknown + ('CX', 'cernox', True), # Cernox + ('RX', 'ruox', True), # rutheniumm oxide + ('GE', 'ge', True), # germanium, LakeShore acronym unknown +] + + +OPTION_TYPE = { + 'loglog': 0, # boolean + 'extrange': 2, # tuple(min T, max T for extrapolation + 'calibrange': 2, # tuple(min T, max T) +} + + +class HasOptions: + def insert_option(self, key, value): + key = key.strip() + argtype = OPTION_TYPE.get(key, 1) + if argtype == 1: # one number or string + try: + value = float(value) + except ValueError: + value = value.strip() + elif argtype == 0: + if value.strip().lower() in ('false', '0'): + value = False + else: + value = True + else: + value = [float(f) for f in value.split(',')] + self.options[key] = value + + +class StdParser(HasOptions): + """parser used for reading columns""" + def __init__(self, **options): + """keys of options may be either 'x' or 'logx' and either 'y' or 'logy' + + default is x=0, y=1 + """ + if 'logx' in options: + self.xscale = 'log' + self.xcol = options.pop('logx') + else: + self.xscale = 'lin' + self.xcol = options.pop('x', 0) + if 'logy' in options: + self.yscale = 'log' + self.ycol = options.pop('logy') + else: + self.yscale = 'lin' + self.ycol = options.pop('y', 1) + self.xdata, self.ydata = [], [] + self.options = options + self.invalid_lines = [] + + def parse(self, line): + """get numbers from a line and put them to self.xdata / self.ydata""" + row = line.split() + try: + self.xdata.append(float(row[self.xcol])) + self.ydata.append(float(row[self.ycol])) + except (IndexError, ValueError): + self.invalid_lines.append(line) + return + + def finish(self): + pass + + +class InpParser(StdParser): + """M. Zollikers *.inp calcurve format""" + HEADERLINE = re.compile(r'#?(?:(\w+)\s*=\s*([^!# \t\n]*)|curv.*)') + INP_TYPES = {ityp: (ltyp, loglog) for ltyp, ityp, loglog in TYPES} + + def __init__(self, **options): + options.update(x=0, y=1) + super().__init__(**options) + self.header = True + + def parse(self, line): + """scan header""" + if self.header: + match = self.HEADERLINE.match(line) + if match: + key, value = match.groups() + if key is None: + self.header = False + else: + key = key.lower() + value = value.strip() + if key == 'type': + type_, loglog = self.INP_TYPES.get(value.lower(), (None, None)) + if type_ is not None: + self.options['type'] = type_ + if loglog is not None: + self.options['loglog'] = loglog + else: + self.insert_option(key, value) + return + elif line.startswith('!'): + return + super().parse(line) + + +class Parser340(StdParser): + """parser for LakeShore *.340 files""" + HEADERLINE = re.compile(r'([^:]*):\s*([^(]*)') + CALIBHIGH = dict(L=325, M=420, H=500, B=40) + + def __init__(self, **options): + options.update(x=1, y=2) + super().__init__(**options) + self.header = True + + def parse(self, line): + """scan header""" + if self.header: + match = self.HEADERLINE.match(line) + if match: + key, value = match.groups() + key = ''.join(key.split()).lower() + value = value.strip() + if key == 'dataformat': + if value[0:1] == '4': + self.xscale, self.yscale = 'log', 'lin' # logOhm + self.options['loglog'] = True + elif value[0:1] == '5': + self.xscale, self.yscale = 'log', 'log' # logOhm, logK + self.options['loglog'] = True + elif value[0:1] in ('1', '2', '3'): + self.options['loglog'] = False + else: + raise ValueError('invalid Data Format') + self.options['scale'] = self.xscale + self.yscale + self.insert_option(key, value) + return + if 'No.' in line: + self.header = False + return + if len(line.split()) != 3: + return + super().parse(line) + if self.header and self.xdata: + # valid line detected + self.header = False + + def finish(self): + model = self.options.get('sensormodel', '').split('-') + if model[0]: + self.options['type'] = model[0] + if 'calibrange' not in self.options: + if len(model) > 2: + try: # e.g. model[-1] == 1.4M -> calibrange = 1.4, 420 + self.options['calibrange'] = float(model[-1][:-1]), self.CALIBHIGH[model[-1][-1]] + return + except (ValueError, KeyError): + pass + + +class CaldatParser(StdParser): + """parser for format from sea/tcl/startup/calib_ext.tcl""" + + def __init__(self, options): + options.update(x=1, y=2) + super().__init__(options) + + +PARSERS = { + "340": Parser340, + "inp": InpParser, + "caldat": CaldatParser, + "dat": StdParser, # lakeshore raw data *.dat format +} + + +def check(x, y, islog): + # check interpolation error + yi = y[:-2] + (x[1:-1] - x[:-2]) * (y[2:] - y[:-2]) / (x[2:] - x[:-2]) + if islog: + return sum((yi - y[1:-1]) ** 2) + return sum((np.log10(yi) - np.log10(y[1:-1])) ** 2) + + +def get_curve(newscale, curves): + """get curve from curve cache (converts not existing ones) + + :param newscale: the new scale to get + :param curves: a dict of storing available scales + :return: the retrieved or converted curve + """ + if newscale in curves: + return curves[newscale] + for scale, array in curves.items(): + curves[newscale] = curve = to_scale[newscale](from_scale[scale](array)) + return curve + + +class CalCurve(HasOptions): + EXTRAPOLATION_AMOUNT = 0.1 + MAX_EXTRAPOLATION_FACTOR = 2 + + def __init__(self, calibspec=None, *, x=None, y=None, cubic_spline=True, **options): + """calibration curve + + :param calibspec: a string with name or filename, options + lookup path for files in env. variable FRAPPY_CALIB_PATH + calibspec format: + [ | ][,= ...] + for / as in parser arguments + :param x, y: x and y arrays (given instead of calibspec) + :param cubic_split: set to False for always using Pchip interpolation + :param options: options for parsers + """ + self.options = options + if calibspec is None: + parser = StdParser() + parser.xdata = x + parser.ydata = y + else: + if x or y: + raise ProgrammingError('can not give both calibspec and x,y ') + sensopt = calibspec.split(',') + calibname = sensopt.pop(0) + _, dot, ext = basename(calibname).rpartition('.') + kind = None + pathlist = os.environ.get('FRAPPY_CALIB_PATH', '').split(':') + pathlist.append(join(dirname(__file__), 'calcurves')) + for path in pathlist: + # first try without adding kind + filename = join(path.strip(), calibname) + if exists(filename): + kind = ext if dot else None + break + # then try adding all kinds as extension + for nam in calibname, calibname.upper(), calibname.lower(): + for kind in PARSERS: + filename = join(path.strip(), '%s.%s' % (nam, kind)) + if exists(filename): + break + else: + continue + break + else: + continue + break + else: + raise FileNotFoundError(calibname) + sensopt = iter(sensopt) + for opt in sensopt: + key, _, value = opt.lower().partition('=') + if OPTION_TYPE.get(key) == 2: + self.options[key] = float(value), float(next(sensopt)) + else: + self.insert_option(key, value) + kind = self.options.pop('kind', kind) + cls = PARSERS.get(kind, StdParser) + try: + parser = cls(**self.options) + with open(filename, encoding='utf-8') as f: + for line in f: + parser.parse(line) + parser.finish() + except Exception as e: + raise ValueError('error parsing calib curve %s %r' % (calibspec, e)) from e + # take defaults from parser options + self.options = dict(parser.options, **self.options) + + x = np.asarray(parser.xdata) + y = np.asarray(parser.ydata) + if len(x) < 2: + raise ValueError('calib file %s has less than 2 points' % calibspec) + + if x[0] > x[-1]: + x = np.flip(np.array(x)) + y = np.flip(np.array(y)) + else: + x = np.array(x) + y = np.array(y) + not_incr_idx = np.argwhere(x[1:] <= x[:-1]) + if len(not_incr_idx): + raise RangeError('x not monotonic at x=%.4g' % x[not_incr_idx[0]]) + + self.x = {parser.xscale: x} + self.y = {parser.yscale: y} + self.lin_forced = [parser.yscale == 'lin' and (y[0] <= 0 or y[-1] <= 0), + parser.xscale == 'lin' and x[0] <= 0] + if sum(self.lin_forced): + self.loglog = False + else: + self.loglog = self.options.get('loglog', y[0] > y[-1]) # loglog defaults to True for NTC + newscale = 'log' if self.loglog else 'lin' + self.scale = newscale + x = get_curve(newscale, self.x) + y = get_curve(newscale, self.y) + self.convert_x = to_scale[newscale] + self.convert_y = from_scale[newscale] + self.calibrange = self.options.get('calibrange') + dirty = set() + self.extra_points = False + self.cutted = False + if self.calibrange: + self.calibrange = sorted(self.calibrange) + # determine indices (ibeg, iend) of first and last well calibrated point + ylin = get_curve('lin', self.y) + beg, end = self.calibrange + if y[0] > y[-1]: + ylin = -ylin + beg, end = -end, -beg + + ibeg, iend = np.searchsorted(ylin, (beg, end)) + if ibeg > 0 and abs(ylin[ibeg-1] - beg) < 0.1 * (ylin[ibeg] - ylin[ibeg-1]): + # add previous point, if close + ibeg -= 1 + if iend < len(ylin) and abs(ylin[iend] - end) < 0.1 * (ylin[iend] - ylin[iend-1]): + # add next point, if close + iend += 1 + if self.options.get('cut', False): + self.cutted = True + x = x[ibeg:iend] + y = y[ibeg:iend] + self.x = {newscale: x} + self.y = {newscale: y} + ibeg = 0 + iend = len(x) + dirty.add('xy') + else: + self.extra_points = ibeg, len(x) - iend + else: + ibeg = 0 + iend = len(x) + ylin = get_curve('lin', self.y) + self.calibrange = tuple(sorted([ylin[0], ylin[-1]])) + + if cubic_spline: + # fit well calibrated part with spline + # determine slopes of calibrated part with CubicSpline + spline = CubicSpline(x[ibeg:iend], y[ibeg:iend]) + roots = spline.derivative().roots(extrapolate=False) + if len(roots): + cubic_spline = False + + self.cubic_spline = cubic_spline + if cubic_spline: + coeff = spline.c + if self.extra_points: + p = PchipInterpolator(x, y).c + # use Pchip outside left and right of calibrange + # remark: first derivative at end of calibrange is not continuous + coeff = np.concatenate((p[:, :ibeg], coeff, p[:, iend-1:]), axis=1) + else: + spline = PchipInterpolator(x, y) + coeff = spline.c + # extrapolation extension + # linear extrapolation is more robust than spline extrapolation + x1, x2 = x[0], x[-1] + # take slope at end of calibrated range for extrapolation + slopes = spline([x[ibeg], x[iend-1]], 1) + for i, j in enumerate([ibeg, iend-2]): + # slope of last interval in calibrange + si = (y[j+1] - y[j])/(x[j+1] - x[j]) + # make sure slope is not more than a factor 2 different + # from the slope calculated at the outermost calibrated intervals + slopes[i] = clamp(slopes[i], 2*si, 0.5 * si) + dx = 0.1 if self.loglog else (x2 - x1) * 0.1 + xe = np.concatenate(([x1 - dx], x, [x2 + dx])) + # x3 = np.append(x, x2 + dx) + # y3 = np.append(y, y[-1] + slope * dx) + y0 = y[0] - slopes[0] * dx + coeff = np.concatenate(([[0], [0], [slopes[0]], [y0]], coeff, [[0], [0], [slopes[1]], [y[-1]]]), axis=1) + self.spline = PPoly(coeff, xe) + # ranges without extrapolation: + self.xrange = get_curve('lin', self.x)[[0, -1]] + self.yrange = sorted(get_curve('lin', self.y)[[0, -1]]) + self.calibrange = [max(self.calibrange[0], self.yrange[0]), + min(self.calibrange[1], self.yrange[1])] + self.set_extrapolation() + + # check + # ys = self.spline(xe) + # ye = np.concatenate(([y0], y, [y[-1] + slope2 * dx])) + # assert np.all(np.abs(ys - ye) < 1e-5 * (0.1 + np.abs(ys + ye))) + + def set_extrapolation(self, extleft=None, extright=None): + """set default extrapolation range for export method + + :param extleft: y value for the lower end of the extrapolation + :param extright: y value for the upper end of the extrapolation + + if arguments omitted or None are replaced by a default extrapolation scheme + + on return self.extx and self.exty are set to the extrapolated ranges + """ + yc1, yc2 = self.calibrange + y1, y2 = to_scale[self.scale]([yc1, yc2]) + d = (y2 - y1) * self.EXTRAPOLATION_AMOUNT + yex1, yex2 = tuple(from_scale[self.scale]([y1 - d, y2 + d])) + t1, t2 = tuple(from_scale[self.scale]([y1, y2])) + + # raw units, excluding extrapolation points at end + xrng = self.spline.x[1], self.spline.x[-2] + # first and last point + yp1, yp2 = sorted(from_scale[self.scale](self.spline(xrng))) + xrng = from_scale[self.scale](xrng) + + # limit by maximal factor + f = self.MAX_EXTRAPOLATION_FACTOR + # but ext range should be at least to the points in curve + self.exty = [min(yp1, max(yex1, min(t1 / f, t1 * f))), + max(yp2, min(yex2, max(t2 * f, t2 / f)))] + if extleft is not None: + self.exty[0] = min(extleft, yp1) + if extright is not None: + self.exty[1] = max(extright, yp2) + self.extx = sorted(self.invert(*yd) for yd in zip(self.exty, xrng)) + # check that sensor range is not extended by more than a factor f + extnew = [max(self.extx[0], min(xrng[0] / f, xrng[0] * f)), + min(self.extx[1], max(xrng[1] / f, xrng[1] * f))] + if extnew != self.extx: + # need further reduction + self.extx = extnew + self.exty = sorted(self(extnew)) + + def convert(self, value): + """convert a single value + + return a tuple (converted value, boolean: was it clamped?) + """ + x = clamp(value, *self.extx) + return self(x), x == value + + def __call__(self, value): + """convert value or numpy array without checking extrapolation range""" + return self.convert_y(self.spline(self.convert_x(value))) + + def invert(self, y, defaultx=None, xscale=True, yscale=True): + """invert y, return defaultx if no solution is found""" + if yscale: + y = to_scale[self.scale](y) + r = self.spline.solve(y) + try: + if xscale: + return from_scale[self.scale](r[0]) + return r[0] + except IndexError: + return defaultx + + def export(self, logformat=False, nmax=199, yrange=None, extrapolate=True, xlimits=None): + """export curve for downloading to hardware + + :param nmax: max number of points. if the number of given points is bigger, + the points with the lowest interpolation error are omitted + :param logformat: a list with two elements of None, True or False + True: use log, False: use line, None: use log if self.loglog + values None are replaced with the effectively used format + False / True are replaced by [False, False] / [True, True] + default is False + :param yrange: to reduce or extrapolate to this interval (extrapolate is ignored when given) + :param extrapolate: a flag indicating whether the curves should be extrapolated + to the preset extrapolation range + :param xlimits: max x range + :return: numpy array with 2 dimensions returning the curve + """ + + if logformat in (True, False): + logformat = [logformat, logformat] + try: + scales = [] + for idx, logfmt in enumerate(logformat): + if logfmt and self.lin_forced[idx]: + raise ValueError('%s must contain positive values only' % 'xy'[idx]) + logformat[idx] = linlog = self.loglog if logfmt is None else logfmt + scales.append('log' if linlog else 'lin') + xscale, yscale = scales + except (TypeError, AssertionError): + raise ValueError('logformat must be a 2 element list or a boolean') + + x = self.spline.x[1:-1] # raw units, excluding extrapolated points + x1, x2 = xmin, xmax = x[0], x[-1] + y1, y2 = sorted(self.spline([x1, x2])) + + if extrapolate and not yrange: + yrange = self.exty + if yrange is not None: + xmin, xmax = sorted(self.invert(*yd, xscale=False) for yd in zip(yrange, [x1, x2])) + if xlimits is not None: + lim = to_scale[self.scale](xlimits) + xmin = clamp(xmin, *lim) + xmax = clamp(xmax, *lim) + if xmin != x1 or xmax != x2: + ibeg, iend = np.searchsorted(x, (xmin, xmax)) + if abs(x[ibeg] - xmin) < 0.1 * (x[ibeg + 1] - x[ibeg]): + # remove first point, if close + ibeg += 1 + if abs(x[iend - 1] - xmax) < 0.1 * (x[iend - 1] - x[iend - 2]): + # remove last point, if close + iend -= 1 + x = np.concatenate(([xmin], x[ibeg:iend], [xmax])) + y = self.spline(x) + + # convert to exported scale + if xscale != self.scale: + x = to_scale[xscale](from_scale[self.scale](x)) + if yscale != self.scale: + y = to_scale[yscale](from_scale[self.scale](y)) + + # reduce number of points, if needed + n = len(x) + i, j = 1, n - 1 # index range for calculating interpolation deviation + deviation = np.zeros(n) + while True: + # calculate interpolation error when a single point is omitted + ym = y[i-1:j-1] + (x[i:j] - x[i-1:j-1]) * (y[i+1:j+1] - y[i-1:j-1]) / (x[i+1:j+1] - x[i-1:j-1]) + if yscale == 'log': + deviation[i:j] = np.abs(ym - y[i:j]) + else: + deviation[i:j] = np.abs(ym - y[i:j]) / (np.abs(ym + y[i:j]) + 1e-10) + if n <= nmax: + break + idx = np.argmin(deviation[1:-1]) + 1 # find index of the smallest error + y = np.delete(y, idx) + x = np.delete(x, idx) + deviation = np.delete(deviation, idx) + n -= 1 + # index range to recalculate + i, j = max(1, idx - 1), min(n - 1, idx + 1) + self.deviation = deviation # for debugging purposes + return np.stack([x, y], axis=1)