#!/usr/bin/env python # -*- coding: utf-8 -*- # ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** """Software calibration""" import os import re from os.path import basename, dirname, exists, join import numpy as np from scipy.interpolate import PchipInterpolator, CubicSpline, PPoly # pylint: disable=import-error from frappy.errors import ProgrammingError, RangeError from frappy.lib import clamp to_scale = { 'lin': lambda x: x, 'log': lambda x: np.log10(x), } from_scale = { 'lin': lambda x: x, 'log': lambda x: 10 ** np.array(x), } TYPES = [ # lakeshore type, inp-type, loglog ('DT', 'si', False), # Si diode ('TG', 'gaalas', False), # GaAlAs diode ('PT', 'pt250', False), # platinum, 250 Ohm range ('PT', 'pt500', False), # platinum, 500 Ohm range ('PT', 'pt2500', False), # platinum, 2500 Ohm range ('RF', 'rhfe', False), # rhodium iron ('CC', 'c', True), # carbon, LakeShore acronym unknown ('CX', 'cernox', True), # Cernox ('RX', 'ruox', True), # rutheniumm oxide ('GE', 'ge', True), # germanium, LakeShore acronym unknown ] OPTION_TYPE = { 'loglog': 0, # boolean 'extrange': 2, # tuple(min T, max T for extrapolation 'calibrange': 2, # tuple(min T, max T) } class HasOptions: def insert_option(self, key, value): key = key.strip() argtype = OPTION_TYPE.get(key, 1) if argtype == 1: # one number or string try: value = float(value) except ValueError: value = value.strip() elif argtype == 0: if value.strip().lower() in ('false', '0'): value = False else: value = True else: value = [float(f) for f in value.split(',')] self.options[key] = value class StdParser(HasOptions): """parser used for reading columns""" def __init__(self, **options): """keys of options may be either 'x' or 'logx' and either 'y' or 'logy' default is x=0, y=1 """ if 'logx' in options: self.xscale = 'log' self.xcol = options.pop('logx') else: self.xscale = 'lin' self.xcol = options.pop('x', 0) if 'logy' in options: self.yscale = 'log' self.ycol = options.pop('logy') else: self.yscale = 'lin' self.ycol = options.pop('y', 1) self.xdata, self.ydata = [], [] self.options = options self.invalid_lines = [] def parse(self, line): """get numbers from a line and put them to self.xdata / self.ydata""" row = line.split() try: self.xdata.append(float(row[self.xcol])) self.ydata.append(float(row[self.ycol])) except (IndexError, ValueError): self.invalid_lines.append(line) return def finish(self): pass class InpParser(StdParser): """M. Zollikers *.inp calcurve format""" HEADERLINE = re.compile(r'#?(?:(\w+)\s*=\s*([^!# \t\n]*)|curv.*)') INP_TYPES = {ityp: (ltyp, loglog) for ltyp, ityp, loglog in TYPES} def __init__(self, **options): options.update(x=0, y=1) super().__init__(**options) self.header = True def parse(self, line): """scan header""" if self.header: match = self.HEADERLINE.match(line) if match: key, value = match.groups() if key is None: self.header = False else: key = key.lower() value = value.strip() if key == 'type': type_, loglog = self.INP_TYPES.get(value.lower(), (None, None)) if type_ is not None: self.options['type'] = type_ if loglog is not None: self.options['loglog'] = loglog else: self.insert_option(key, value) return elif line.startswith('!'): return super().parse(line) class Parser340(StdParser): """parser for LakeShore *.340 files""" HEADERLINE = re.compile(r'([^:]*):\s*([^(]*)') CALIBHIGH = dict(L=325, M=420, H=500, B=40) def __init__(self, **options): options.update(x=1, y=2) super().__init__(**options) self.header = True def parse(self, line): """scan header""" if self.header: match = self.HEADERLINE.match(line) if match: key, value = match.groups() key = ''.join(key.split()).lower() value = value.strip() if key == 'dataformat': if value[0:1] == '4': self.xscale, self.yscale = 'log', 'lin' # logOhm self.options['loglog'] = True elif value[0:1] == '5': self.xscale, self.yscale = 'log', 'log' # logOhm, logK self.options['loglog'] = True elif value[0:1] in ('1', '2', '3'): self.options['loglog'] = False else: raise ValueError('invalid Data Format') self.options['scale'] = self.xscale + self.yscale self.insert_option(key, value) return if 'No.' in line: self.header = False return if len(line.split()) != 3: return super().parse(line) if self.header and self.xdata: # valid line detected self.header = False def finish(self): model = self.options.get('sensormodel', '').split('-') if model[0]: self.options['type'] = model[0] if 'calibrange' not in self.options: if len(model) > 2: try: # e.g. model[-1] == 1.4M -> calibrange = 1.4, 420 self.options['calibrange'] = float(model[-1][:-1]), self.CALIBHIGH[model[-1][-1]] return except (ValueError, KeyError): pass class CaldatParser(StdParser): """parser for format from sea/tcl/startup/calib_ext.tcl""" def __init__(self, options): options.update(x=1, y=2) super().__init__(options) PARSERS = { "340": Parser340, "inp": InpParser, "caldat": CaldatParser, "dat": StdParser, # lakeshore raw data *.dat format } def check(x, y, islog): # check interpolation error yi = y[:-2] + (x[1:-1] - x[:-2]) * (y[2:] - y[:-2]) / (x[2:] - x[:-2]) if islog: return sum((yi - y[1:-1]) ** 2) return sum((np.log10(yi) - np.log10(y[1:-1])) ** 2) def get_curve(newscale, curves): """get curve from curve cache (converts not existing ones) :param newscale: the new scale to get :param curves: a dict of storing available scales :return: the retrieved or converted curve """ if newscale in curves: return curves[newscale] for scale, array in curves.items(): curves[newscale] = curve = to_scale[newscale](from_scale[scale](array)) return curve class CalCurve(HasOptions): EXTRAPOLATION_AMOUNT = 0.1 MAX_EXTRAPOLATION_FACTOR = 2 def __init__(self, calibspec=None, *, x=None, y=None, cubic_spline=True, **options): """calibration curve :param calibspec: a string with name or filename, options lookup path for files in env. variable FRAPPY_CALIB_PATH calibspec format: [ | ][,= ...] for / as in parser arguments :param x, y: x and y arrays (given instead of calibspec) :param cubic_split: set to False for always using Pchip interpolation :param options: options for parsers """ self.options = options if calibspec is None: parser = StdParser() parser.xdata = x parser.ydata = y else: if x or y: raise ProgrammingError('can not give both calibspec and x,y ') sensopt = calibspec.split(',') calibname = sensopt.pop(0) _, dot, ext = basename(calibname).rpartition('.') kind = None pathlist = os.environ.get('FRAPPY_CALIB_PATH', '').split(':') pathlist.append(join(dirname(__file__), 'calcurves')) for path in pathlist: # first try without adding kind filename = join(path.strip(), calibname) if exists(filename): kind = ext if dot else None break # then try adding all kinds as extension for nam in calibname, calibname.upper(), calibname.lower(): for kind in PARSERS: filename = join(path.strip(), '%s.%s' % (nam, kind)) if exists(filename): break else: continue break else: continue break else: raise FileNotFoundError(calibname) sensopt = iter(sensopt) for opt in sensopt: key, _, value = opt.lower().partition('=') if OPTION_TYPE.get(key) == 2: self.options[key] = float(value), float(next(sensopt)) else: self.insert_option(key, value) kind = self.options.pop('kind', kind) cls = PARSERS.get(kind, StdParser) try: parser = cls(**self.options) with open(filename, encoding='utf-8') as f: for line in f: parser.parse(line) parser.finish() except Exception as e: raise ValueError('error parsing calib curve %s %r' % (calibspec, e)) from e # take defaults from parser options self.options = dict(parser.options, **self.options) x = np.asarray(parser.xdata) y = np.asarray(parser.ydata) if len(x) < 2: raise ValueError('calib file %s has less than 2 points' % calibspec) if x[0] > x[-1]: x = np.flip(np.array(x)) y = np.flip(np.array(y)) else: x = np.array(x) y = np.array(y) not_incr_idx = np.argwhere(x[1:] <= x[:-1]) if len(not_incr_idx): raise RangeError('x not monotonic at x=%.4g' % x[not_incr_idx[0]]) self.x = {parser.xscale: x} self.y = {parser.yscale: y} self.lin_forced = [parser.yscale == 'lin' and (y[0] <= 0 or y[-1] <= 0), parser.xscale == 'lin' and x[0] <= 0] if sum(self.lin_forced): self.loglog = False else: self.loglog = self.options.get('loglog', y[0] > y[-1]) # loglog defaults to True for NTC newscale = 'log' if self.loglog else 'lin' self.scale = newscale x = get_curve(newscale, self.x) y = get_curve(newscale, self.y) self.convert_x = to_scale[newscale] self.convert_y = from_scale[newscale] self.calibrange = self.options.get('calibrange') dirty = set() self.extra_points = False self.cutted = False if self.calibrange: self.calibrange = sorted(self.calibrange) # determine indices (ibeg, iend) of first and last well calibrated point ylin = get_curve('lin', self.y) beg, end = self.calibrange if y[0] > y[-1]: ylin = -ylin beg, end = -end, -beg ibeg, iend = np.searchsorted(ylin, (beg, end)) if ibeg > 0 and abs(ylin[ibeg-1] - beg) < 0.1 * (ylin[ibeg] - ylin[ibeg-1]): # add previous point, if close ibeg -= 1 if iend < len(ylin) and abs(ylin[iend] - end) < 0.1 * (ylin[iend] - ylin[iend-1]): # add next point, if close iend += 1 if self.options.get('cut', False): self.cutted = True x = x[ibeg:iend] y = y[ibeg:iend] self.x = {newscale: x} self.y = {newscale: y} ibeg = 0 iend = len(x) dirty.add('xy') else: self.extra_points = ibeg, len(x) - iend else: ibeg = 0 iend = len(x) ylin = get_curve('lin', self.y) self.calibrange = tuple(sorted([ylin[0], ylin[-1]])) if cubic_spline: # fit well calibrated part with spline # determine slopes of calibrated part with CubicSpline spline = CubicSpline(x[ibeg:iend], y[ibeg:iend]) roots = spline.derivative().roots(extrapolate=False) if len(roots): cubic_spline = False self.cubic_spline = cubic_spline if cubic_spline: coeff = spline.c if self.extra_points: p = PchipInterpolator(x, y).c # use Pchip outside left and right of calibrange # remark: first derivative at end of calibrange is not continuous coeff = np.concatenate((p[:, :ibeg], coeff, p[:, iend-1:]), axis=1) else: spline = PchipInterpolator(x, y) coeff = spline.c # extrapolation extension # linear extrapolation is more robust than spline extrapolation x1, x2 = x[0], x[-1] # take slope at end of calibrated range for extrapolation slopes = spline([x[ibeg], x[iend-1]], 1) for i, j in enumerate([ibeg, iend-2]): # slope of last interval in calibrange si = (y[j+1] - y[j])/(x[j+1] - x[j]) # make sure slope is not more than a factor 2 different # from the slope calculated at the outermost calibrated intervals slopes[i] = clamp(slopes[i], 2*si, 0.5 * si) dx = 0.1 if self.loglog else (x2 - x1) * 0.1 xe = np.concatenate(([x1 - dx], x, [x2 + dx])) # x3 = np.append(x, x2 + dx) # y3 = np.append(y, y[-1] + slope * dx) y0 = y[0] - slopes[0] * dx coeff = np.concatenate(([[0], [0], [slopes[0]], [y0]], coeff, [[0], [0], [slopes[1]], [y[-1]]]), axis=1) self.spline = PPoly(coeff, xe) # ranges without extrapolation: self.xrange = get_curve('lin', self.x)[[0, -1]] self.yrange = sorted(get_curve('lin', self.y)[[0, -1]]) self.calibrange = [max(self.calibrange[0], self.yrange[0]), min(self.calibrange[1], self.yrange[1])] self.set_extrapolation() # check # ys = self.spline(xe) # ye = np.concatenate(([y0], y, [y[-1] + slope2 * dx])) # assert np.all(np.abs(ys - ye) < 1e-5 * (0.1 + np.abs(ys + ye))) def set_extrapolation(self, extleft=None, extright=None): """set default extrapolation range for export method :param extleft: y value for the lower end of the extrapolation :param extright: y value for the upper end of the extrapolation if arguments omitted or None are replaced by a default extrapolation scheme on return self.extx and self.exty are set to the extrapolated ranges """ yc1, yc2 = self.calibrange y1, y2 = to_scale[self.scale]([yc1, yc2]) d = (y2 - y1) * self.EXTRAPOLATION_AMOUNT yex1, yex2 = tuple(from_scale[self.scale]([y1 - d, y2 + d])) t1, t2 = tuple(from_scale[self.scale]([y1, y2])) # raw units, excluding extrapolation points at end xrng = self.spline.x[1], self.spline.x[-2] # first and last point yp1, yp2 = sorted(from_scale[self.scale](self.spline(xrng))) xrng = from_scale[self.scale](xrng) # limit by maximal factor f = self.MAX_EXTRAPOLATION_FACTOR # but ext range should be at least to the points in curve self.exty = [min(yp1, max(yex1, min(t1 / f, t1 * f))), max(yp2, min(yex2, max(t2 * f, t2 / f)))] if extleft is not None: self.exty[0] = min(extleft, yp1) if extright is not None: self.exty[1] = max(extright, yp2) self.extx = sorted(self.invert(*yd) for yd in zip(self.exty, xrng)) # check that sensor range is not extended by more than a factor f extnew = [max(self.extx[0], min(xrng[0] / f, xrng[0] * f)), min(self.extx[1], max(xrng[1] / f, xrng[1] * f))] if extnew != self.extx: # need further reduction self.extx = extnew self.exty = sorted(self(extnew)) def convert(self, value): """convert a single value return a tuple (converted value, boolean: was it clamped?) """ x = clamp(value, *self.extx) return self(x), x == value def __call__(self, value): """convert value or numpy array without checking extrapolation range""" return self.convert_y(self.spline(self.convert_x(value))) def invert(self, y, defaultx=None, xscale=True, yscale=True): """invert y, return defaultx if no solution is found""" if yscale: y = to_scale[self.scale](y) r = self.spline.solve(y) try: if xscale: return from_scale[self.scale](r[0]) return r[0] except IndexError: return defaultx def export(self, logformat=False, nmax=199, yrange=None, extrapolate=True, xlimits=None): """export curve for downloading to hardware :param nmax: max number of points. if the number of given points is bigger, the points with the lowest interpolation error are omitted :param logformat: a list with two elements of None, True or False True: use log, False: use line, None: use log if self.loglog values None are replaced with the effectively used format False / True are replaced by [False, False] / [True, True] default is False :param yrange: to reduce or extrapolate to this interval (extrapolate is ignored when given) :param extrapolate: a flag indicating whether the curves should be extrapolated to the preset extrapolation range :param xlimits: max x range :return: numpy array with 2 dimensions returning the curve """ if logformat in (True, False): logformat = [logformat, logformat] try: scales = [] for idx, logfmt in enumerate(logformat): if logfmt and self.lin_forced[idx]: raise ValueError('%s must contain positive values only' % 'xy'[idx]) logformat[idx] = linlog = self.loglog if logfmt is None else logfmt scales.append('log' if linlog else 'lin') xscale, yscale = scales except (TypeError, AssertionError): raise ValueError('logformat must be a 2 element list or a boolean') x = self.spline.x[1:-1] # raw units, excluding extrapolated points x1, x2 = xmin, xmax = x[0], x[-1] y1, y2 = sorted(self.spline([x1, x2])) if extrapolate and not yrange: yrange = self.exty if yrange is not None: xmin, xmax = sorted(self.invert(*yd, xscale=False) for yd in zip(yrange, [x1, x2])) if xlimits is not None: lim = to_scale[self.scale](xlimits) xmin = clamp(xmin, *lim) xmax = clamp(xmax, *lim) if xmin != x1 or xmax != x2: ibeg, iend = np.searchsorted(x, (xmin, xmax)) if abs(x[ibeg] - xmin) < 0.1 * (x[ibeg + 1] - x[ibeg]): # remove first point, if close ibeg += 1 if abs(x[iend - 1] - xmax) < 0.1 * (x[iend - 1] - x[iend - 2]): # remove last point, if close iend -= 1 x = np.concatenate(([xmin], x[ibeg:iend], [xmax])) y = self.spline(x) # convert to exported scale if xscale != self.scale: x = to_scale[xscale](from_scale[self.scale](x)) if yscale != self.scale: y = to_scale[yscale](from_scale[self.scale](y)) # reduce number of points, if needed n = len(x) i, j = 1, n - 1 # index range for calculating interpolation deviation deviation = np.zeros(n) while True: # calculate interpolation error when a single point is omitted ym = y[i-1:j-1] + (x[i:j] - x[i-1:j-1]) * (y[i+1:j+1] - y[i-1:j-1]) / (x[i+1:j+1] - x[i-1:j-1]) if yscale == 'log': deviation[i:j] = np.abs(ym - y[i:j]) else: deviation[i:j] = np.abs(ym - y[i:j]) / (np.abs(ym + y[i:j]) + 1e-10) if n <= nmax: break idx = np.argmin(deviation[1:-1]) + 1 # find index of the smallest error y = np.delete(y, idx) x = np.delete(x, idx) deviation = np.delete(deviation, idx) n -= 1 # index range to recalculate i, j = max(1, idx - 1), min(n - 1, idx + 1) self.deviation = deviation # for debugging purposes return np.stack([x, y], axis=1)