#!/usr/bin/env python # -*- coding: utf-8 -*- # ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** """Software calibration""" import os import re from pathlib import Path from os.path import basename, dirname, exists, join import numpy as np from scipy.interpolate import PchipInterpolator, CubicSpline, PPoly # pylint: disable=import-error from frappy.errors import ProgrammingError, RangeError from frappy.lib import clamp def identity(x): return x def exp10(x): return 10 ** np.array(x) to_scale = { 'lin': identity, 'log': np.log10, } from_scale = { 'lin': identity, 'log': exp10, } TYPES = [ # lakeshore type, inp-type, loglog ('DT', 'si', False), # Si diode ('TG', 'gaalas', False), # GaAlAs diode ('PT', 'pt250', False), # platinum, 250 Ohm range ('PT', 'pt500', False), # platinum, 500 Ohm range ('PT', 'pt2500', False), # platinum, 2500 Ohm range ('RF', 'rhfe', False), # rhodium iron ('CC', 'c', True), # carbon, LakeShore acronym unknown ('CX', 'cernox', True), # Cernox ('RX', 'ruox', True), # rutheniumm oxide ('GE', 'ge', True), # germanium, LakeShore acronym unknown ] OPTION_TYPE = { 'loglog': 0, # boolean 'extrange': 2, # tuple(min T, max T) for extrapolation 'calibrange': 2, # tuple(min T, max T) } class HasOptions: def insert_option(self, key, value): key = key.strip() argtype = OPTION_TYPE.get(key, 1) if argtype == 1: # one number or string try: value = float(value) except ValueError: value = value.strip() elif argtype == 0: if value.strip().lower() in ('false', '0'): value = False else: value = True else: value = [float(f) for f in value.split(',')] self.options[key] = value class StdParser(HasOptions): """parser used for reading columns""" def __init__(self, **options): """keys of options may be either 'x' or 'logx' and either 'y' or 'logy' default is x=0, y=1 """ if 'logx' in options: self.xscale = 'log' self.xcol = options.pop('logx') else: self.xscale = 'lin' self.xcol = options.pop('x', 0) if 'logy' in options: self.yscale = 'log' self.ycol = options.pop('logy') else: self.yscale = 'lin' self.ycol = options.pop('y', 1) self.xdata, self.ydata = [], [] self.options = options self.invalid_lines = [] def parse(self, line): """get numbers from a line and put them to self.xdata / self.ydata""" row = line.split() try: self.xdata.append(float(row[self.xcol])) self.ydata.append(float(row[self.ycol])) except (IndexError, ValueError): self.invalid_lines.append(line) return def finish(self): pass class InpParser(StdParser): """M. Zollikers *.inp calcurve format""" HEADERLINE = re.compile(r'#?(?:(\w+)\s*=\s*([^!# \t\n]*)|curv.*)') INP_TYPES = {ityp: (ltyp, loglog) for ltyp, ityp, loglog in TYPES} def __init__(self, **options): options.update(x=0, y=1) super().__init__(**options) self.header = True def parse(self, line): """scan header""" if self.header: match = self.HEADERLINE.match(line) if match: key, value = match.groups() if key is None: self.header = False else: key = key.lower() value = value.strip() if key == 'type': type_, loglog = self.INP_TYPES.get(value.lower(), (None, None)) if type_ is not None: self.options['type'] = type_ if loglog is not None: self.options['loglog'] = loglog else: self.insert_option(key, value) return elif line.startswith('!'): return super().parse(line) class Parser340(StdParser): """parser for LakeShore *.340 files""" HEADERLINE = re.compile(r'([^:]*):\s*([^(]*)') CALIBHIGH = dict(L=325, M=420, H=500, B=40) def __init__(self, **options): options.update(x=1, y=2) super().__init__(**options) self.header = True def parse(self, line): """scan header""" if self.header: match = self.HEADERLINE.match(line) if match: key, value = match.groups() key = ''.join(key.split()).lower() value = value.strip() if key == 'dataformat': if value[0:1] == '4': self.xscale, self.yscale = 'log', 'lin' # logOhm self.options['loglog'] = True elif value[0:1] == '5': self.xscale, self.yscale = 'log', 'log' # logOhm, logK self.options['loglog'] = True elif value[0:1] in ('1', '2', '3'): self.options['loglog'] = False else: raise ValueError('invalid Data Format') self.options['scale'] = self.xscale + self.yscale self.insert_option(key, value) return if 'No.' in line: self.header = False return if len(line.split()) != 3: return super().parse(line) if self.header and self.xdata: # valid line detected self.header = False def finish(self): model = self.options.get('sensormodel', '').split('-') if model[0]: self.options['type'] = model[0] if 'calibrange' not in self.options: if len(model) > 2: try: # e.g. model[-1] == 1.4M -> calibrange = 1.4, 420 self.options['calibrange'] = float(model[-1][:-1]), self.CALIBHIGH[model[-1][-1]] return except (ValueError, KeyError): pass class CaldatParser(StdParser): """parser for format from sea/tcl/startup/calib_ext.tcl""" def __init__(self, options): options.update(x=1, y=2) super().__init__(options) PARSERS = { "340": Parser340, "inp": InpParser, "caldat": CaldatParser, "dat": StdParser, # lakeshore raw data *.dat format } def get_curve(newscale, curves): """get curve from curve cache (converts not existing ones) :param newscale: the new scale to get :param curves: a dict of storing available scales :return: the retrieved or converted curve """ if newscale in curves: return curves[newscale] for scale, array in curves.items(): curves[newscale] = curve = to_scale[newscale](from_scale[scale](array)) return curve class CalCurve(HasOptions): EXTRAPOLATION_AMOUNT = 0.1 MAX_EXTRAPOLATION_FACTOR = 2 filename = None # calibration file def __init__(self, calibspec=None, *, x=None, y=None, cubic_spline=True, **options): """calibration curve :param calibspec: a string with name or filename, options lookup path for files in env. variable FRAPPY_CALIB_PATH calibspec format: [ | ][,= ...] for / as in parser arguments :param x, y: x and y arrays (given instead of calibspec) :param cubic_spline: set to False for always using Pchip interpolation :param options: options for parsers """ self.options = options if calibspec is None: parser = StdParser() parser.xdata = x parser.ydata = y self.calibname = 'custom' else: if x or y: raise ProgrammingError('can not give both calibspec and x,y ') sensopt = calibspec.split(',') calibname = sensopt.pop(0) self.calibname = basename(calibname) head, dot, ext = self.calibname.rpartition('.') if dot: self.calibname = head kind = None pathlist = [Path(p.strip()) for p in os.environ.get('FRAPPY_CALIB_PATH', '').split(':')] pathlist.append(Path(dirname(__file__)) / 'calcurves') for path in pathlist: # first try without adding kind filename = path / calibname if filename.exists(): kind = ext if dot else None break # then try adding all kinds as extension for nam in calibname, calibname.upper(), calibname.lower(): for kind in PARSERS: filename = path / f'{nam}.{kind}' if exists(filename): self.filename = filename break else: continue break else: continue break else: raise FileNotFoundError(calibname) sensopt = iter(sensopt) for opt in sensopt: key, _, value = opt.lower().partition('=') if OPTION_TYPE.get(key) == 2: self.options[key] = float(value), float(next(sensopt)) else: self.insert_option(key, value) kind = self.options.pop('kind', kind) cls = PARSERS.get(kind, StdParser) try: parser = cls(**self.options) with open(filename, encoding='utf-8') as f: for line in f: parser.parse(line) parser.finish() except Exception as e: raise ValueError('error parsing calib curve %s %r' % (calibspec, e)) from e # take defaults from parser options self.options = dict(parser.options, **self.options) x = np.asarray(parser.xdata) y = np.asarray(parser.ydata) if len(x) < 2: raise ValueError('calib file %s has less than 2 points' % calibspec) if x[0] > x[-1]: x = np.flip(np.array(x)) y = np.flip(np.array(y)) else: x = np.array(x) y = np.array(y) not_incr_idx = np.argwhere(x[1:] <= x[:-1]) if len(not_incr_idx): raise RangeError('x not monotonic at x=%.4g' % x[not_incr_idx[0]]) self.ptc = y[-1] > y[0] self.x = {parser.xscale: x} self.y = {parser.yscale: y} self.lin_forced = [parser.yscale == 'lin' and (y[0] <= 0 or y[-1] <= 0), parser.xscale == 'lin' and x[0] <= 0] if sum(self.lin_forced): self.loglog = False else: self.loglog = self.options.get('loglog', y[0] > y[-1]) # loglog defaults to True for NTC newscale = 'log' if self.loglog else 'lin' self.scale = newscale x = get_curve(newscale, self.x) y = get_curve(newscale, self.y) self.convert_x = to_scale[newscale] self.convert_y = from_scale[newscale] self.calibrange = self.options.get('calibrange') self.extra_points = (0, 0) self.cutted = False if self.calibrange: self.calibrange = sorted(self.calibrange) # determine indices (ibeg, iend) of first and last well calibrated point ylin = get_curve('lin', self.y) beg, end = self.calibrange if y[0] > y[-1]: ylin = -ylin beg, end = -end, -beg ibeg, iend = np.searchsorted(ylin, (beg, end)) if ibeg > 0 and abs(ylin[ibeg-1] - beg) < 0.1 * (ylin[ibeg] - ylin[ibeg-1]): # add previous point, if close ibeg -= 1 if iend < len(ylin) and abs(ylin[iend] - end) < 0.1 * (ylin[iend] - ylin[iend-1]): # add next point, if close iend += 1 if self.options.get('cut', False): self.cutted = True x = x[ibeg:iend] y = y[ibeg:iend] self.x = {newscale: x} self.y = {newscale: y} ibeg = 0 iend = len(x) else: self.extra_points = ibeg, len(x) - iend else: ibeg = 0 iend = len(x) ylin = get_curve('lin', self.y) self.calibrange = tuple(sorted([ylin[0], ylin[-1]])) if cubic_spline: # fit well calibrated part with spline # determine slopes of calibrated part with CubicSpline spline = CubicSpline(x[ibeg:iend], y[ibeg:iend]) roots = spline.derivative().roots(extrapolate=False) if len(roots): cubic_spline = False self.cubic_spline = cubic_spline if cubic_spline: coeff = spline.c if self.extra_points: p = PchipInterpolator(x, y).c # use Pchip outside left and right of calibrange # remark: first derivative at end of calibrange is not continuous coeff = np.concatenate((p[:, :ibeg], coeff, p[:, iend-1:]), axis=1) else: spline = PchipInterpolator(x, y) coeff = spline.c # extrapolation extension # linear extrapolation is more robust than spline extrapolation x1, x2 = x[0], x[-1] # take slope at end of calibrated range for extrapolation slopes = spline([x[ibeg], x[iend-1]], 1) for i, j in enumerate([ibeg, iend-2]): # slope of last interval in calibrange si = (y[j+1] - y[j])/(x[j+1] - x[j]) # make sure slope is not more than a factor 2 different # from the slope calculated at the outermost calibrated intervals slopes[i] = clamp(slopes[i], 2*si, 0.5 * si) dx = 0.1 if self.loglog else (x2 - x1) * 0.1 xe = np.concatenate(([x1 - dx], x, [x2 + dx])) # x3 = np.append(x, x2 + dx) # y3 = np.append(y, y[-1] + slope * dx) y0 = y[0] - slopes[0] * dx coeff = np.concatenate(([[0], [0], [slopes[0]], [y0]], coeff, [[0], [0], [slopes[1]], [y[-1]]]), axis=1) self.spline = PPoly(coeff, xe) # ranges without extrapolation: self.xrange = get_curve('lin', self.x)[[0, -1]] self.yrange = sorted(get_curve('lin', self.y)[[0, -1]]) self.calibrange = [max(self.calibrange[0], self.yrange[0]), min(self.calibrange[1], self.yrange[1])] self.set_extrapolation() # check # ys = self.spline(xe) # ye = np.concatenate(([y0], y, [y[-1] + slope2 * dx])) # assert np.all(np.abs(ys - ye) < 1e-5 * (0.1 + np.abs(ys + ye))) def set_extrapolation(self, extleft=None, extright=None): """set default extrapolation range for export method :param extleft: y value for the lower end of the extrapolation :param extright: y value for the upper end of the extrapolation if arguments omitted or None are replaced by a default extrapolation scheme on return self.extx and self.exty are set to the extrapolated ranges """ yc1, yc2 = self.calibrange y1, y2 = to_scale[self.scale]([yc1, yc2]) d = (y2 - y1) * self.EXTRAPOLATION_AMOUNT yex1, yex2 = tuple(from_scale[self.scale]([y1 - d, y2 + d])) t1, t2 = tuple(from_scale[self.scale]([y1, y2])) # raw units, excluding extrapolation points at end xrng = self.spline.x[1], self.spline.x[-2] # first and last point yp1, yp2 = sorted(from_scale[self.scale](self.spline(xrng))) xrng = from_scale[self.scale](xrng) # limit by maximal factor f = self.MAX_EXTRAPOLATION_FACTOR # but ext range should be at least to the points in curve self.exty = [min(yp1, max(yex1, min(t1 / f, t1 * f))), max(yp2, min(yex2, max(t2 * f, t2 / f)))] if extleft is not None: self.exty[0] = min(extleft, yp1) if extright is not None: self.exty[1] = max(extright, yp2) self.extx = sorted(self.invert(*yd) for yd in zip(self.exty, xrng)) # check that sensor range is not extended by more than a factor f extnew = [max(self.extx[0], min(xrng[0] / f, xrng[0] * f)), min(self.extx[1], max(xrng[1] / f, xrng[1] * f))] if extnew != self.extx: # need further reduction self.extx = extnew self.exty = sorted(self(extnew)) def convert(self, value): """convert a single value return a tuple (converted value, boolean: was it clamped?) """ x = clamp(value, *self.extx) return self(x), x == value def __call__(self, value): """convert value or numpy array without checking extrapolation range""" return self.convert_y(self.spline(self.convert_x(value))) def invert(self, y, defaultx=None, xscale=True, yscale=True): """invert y, return defaultx if no solution is found""" if yscale: y = to_scale[self.scale](y) r = self.spline.solve(y) try: if xscale: return from_scale[self.scale](r[0]) return r[0] except IndexError: return defaultx def interpolation_error(self, x0, x1, y0, y1, funx, funy, relerror, return_tuple=False): """calcualte interpoaltion error :param x0: start of interval :param x1: end of interval :param y0: y at start of interval :param y1: y at end of interval :param funx: function to convert x from exported scale to internal scale :param funy: function to convert y from internal scale to exported scale :param relerror: True when the exported y scale is linear :param return_tuple: True: return interpolation error as a tuple with two values (without and with 3 additional points) False: return one value without additional points :return: relative deviation """ xspace = np.linspace(x0, x1, 9) x = funx(xspace) yr = self.spline(x) yspline = funy(yr) yinterp = y0 + np.linspace(0.0, y1 - y0, 9) # difference between spline (at m points) and liner interpolation diff = np.abs(yspline - yinterp) # estimate of interpolation error with 4 sections: # difference between spline (at m points) and linear interpolation between neighboring points if relerror: fact = 2 / (np.abs(y0) + np.abs(y1)) # division by zero can not happen, as y0 and y1 can not both be zero else: fact = 2.3 # difference is in log10 -> multiply by 1 / log10(e) result = np.max(diff, axis=0) * fact if return_tuple: diff2 = np.abs(0.5 * (yspline[:-2:2] + yspline[2::2]) - funy(yr[1:-1:2])) return result, np.max(diff2, axis=0) * fact return result def export(self, logformat=False, nmax=199, yrange=None, extrapolate=True, xlimits=None, nmin=199): """export curve for downloading to hardware :param nmax: max number of points. if the number of given points is bigger, the points with the lowest interpolation error are omitted :param logformat: a list with two elements of None, True or False for x and y True: use log, False: use lin, None: use log if self.loglog values None are replaced with the effectively used format False / True are replaced by [False, False] / [True, True] default is False :param yrange: to reduce or extrapolate to this interval (extrapolate is ignored when given) :param extrapolate: a flag indicating whether the curves should be extrapolated to the preset extrapolation range :param xlimits: max x range :param nmin: minimum number of points :return: numpy array with 2 dimensions returning the curve """ if logformat in (True, False): logformat = (logformat, logformat) self.logformat = list(logformat) try: scales = [] for idx, logfmt in enumerate(logformat): if logfmt and self.lin_forced[idx]: raise ValueError('%s must contain positive values only' % 'xy'[idx]) self.logformat[idx] = linlog = self.loglog if logfmt is None else logfmt scales.append('log' if linlog else 'lin') xscale, yscale = scales except (TypeError, AssertionError): raise ValueError('logformat must be a 2 element sequence or a boolean') xr = self.spline.x[1:-1] # raw units, excluding extrapolated points x1, x2 = xmin, xmax = xr[0], xr[-1] if extrapolate and not yrange: yrange = self.exty if yrange is not None: xmin, xmax = sorted(self.invert(*yd, xscale=False) for yd in zip(yrange, [x1, x2])) if xlimits is not None: lim = to_scale[self.scale](xlimits) xmin = clamp(xmin, *lim) xmax = clamp(xmax, *lim) # start and end index of calibrated range ibeg, iend = self.extra_points[0], len(xr) - self.extra_points[1] if xmin != x1 or xmax != x2: i, j = np.searchsorted(xr, (xmin, xmax)) if abs(xr[i] - xmin) < 0.1 * (xr[i + 1] - xr[i]): # remove first point, if close i += 1 if abs(xr[j - 1] - xmax) < 0.1 * (xr[j - 1] - xr[j - 2]): # remove last point, if close j -= 1 offset = i - 1 xr = np.concatenate(([xmin], xr[i:j], [xmax])) ibeg = max(0, ibeg - offset) iend = min(len(xr), iend - offset) yr = self.spline(xr) # convert to exported scale if xscale == self.scale: xbwd = identity x = xr else: if self.scale == 'log': xfwd, xbwd = from_scale[self.scale], to_scale[self.scale] else: xfwd, xbwd = to_scale[xscale], from_scale[xscale] x = xfwd(xr) if yscale == self.scale: yfwd = identity y = yr else: if self.scale == 'log': yfwd = from_scale[self.scale] else: yfwd = to_scale[yscale] y = yfwd(yr) self.deviation = None nmin = min(nmin, nmax) n = len(x) relerror = yscale == 'lin' if len(x) > nmax: # reduce number of points, if needed i, j = 1, n - 1 # index range for calculating interpolation deviation deviation = np.zeros(n) while True: deviation[i:j] = self.interpolation_error( x[i-1:j-1], x[i+1:j+1], y[i-1:j-1], y[i+1:j+1], xbwd, yfwd, relerror) # calculate interpolation error when a single point is omitted if n <= nmax: break idx = np.argmin(deviation[1:-1]) + 1 # find index of the smallest error y = np.delete(y, idx) x = np.delete(x, idx) deviation = np.delete(deviation, idx) n = len(x) # index range to recalculate i, j = max(1, idx - 1), min(n - 1, idx + 1) self.deviation = deviation # for debugging purposes elif n < nmin: if ibeg + 1 < iend: diff1, diff4 = self.interpolation_error( x[ibeg:iend - 1], x[ibeg + 1:iend], y[ibeg:iend - 1], y[ibeg + 1:iend], xbwd, yfwd, relerror, return_tuple=True) dif_target = 1e-4 sq4 = np.sqrt(diff4) * 4 sq1 = np.sqrt(diff1) offset = 0.49 n_mid = nmax - len(x) + iend - ibeg - 1 # iteration to find a dif target resulting in no more than nmax points while True: scale = 1 / np.sqrt(dif_target) # estimate number of intermediate points (float!) needed to reach dif_target # number of points estimated from the result of the interpolation error with 4 sections n4 = np.maximum(1, sq4 * scale) # number of points estimated from the result of the interpolation error with 1 section n1 = np.maximum(1, sq1 * scale) # use n4 where n4 > 4, n1, where n1 < 1 and a weighted average in between nn = np.select([n4 > 4, n1 > 1], [n4, (n4 * (n1 - 1) + n1 * (4 - n4)) / (3 + n1 - n4)], n1) n_tot = np.sum(np.rint(nn + offset)) extra = n_tot - n_mid if extra <= 0: break dif_target *= (n_tot / n_mid) ** 2 xnew = [x[:ibeg]] for x0, x1, ni in zip(x[ibeg:iend-1], x[ibeg+1:iend], np.rint(nn + offset)): xnew.append(np.linspace(x0, x1, int(ni) + 1)[:-1]) xnew.append(x[iend-1:]) x = np.concatenate(xnew) y = yfwd(self.spline(xbwd(x))) # for debugging purposes: self.deviation = self.interpolation_error(x[:-1], x[1:], y[:-1], y[1:], xbwd, yfwd, relerror) return np.stack([x, y], axis=1)