''' utilities for creating cernox calibration curves ''' import numpy as np import matplotlib.pyplot as plt from scipy.interpolate import splrep, splev import math from glob import glob import os import time nplog = np.vectorize(math.log10) npexp = np.vectorize(lambda x:10 ** x) class StdFilter(object): '''filter used for reading columns''' def __init__(self, colnums = None, logformat = None): if colnums is None: colnums = (0,1) self.colnums = colnums self.logformat = logformat self.output = [[] for i in colnums] def parse(self, line): '''get numbers from a line and put them to self.output''' if line.strip().startswith('#'): return values = [] row = line.split() try: for c in self.colnums: values.append(float(row[c])) except (IndexError, ValueError): #print('SKIP: %s' % line.strip()) return self.posttreat(values) def posttreat(self, values): '''post treatment, mainly converting from log''' if self.logformat: values = values[:] for c in self.logformat: values[c] = 10 ** values[c] for out, val in zip(self.output, values): out.append(val) class Filter340(StdFilter): '''filter for LakeShore *.340 files''' def __init__(self): self.logformat = None self.header = True self.output = [[], []] def parse(self, line): '''scan header for data format''' if line.startswith('#'): return if self.header: if line.startswith("Data Format"): dataformat = line.split(":")[1].strip()[0] if dataformat == '4': self.logformat = [0] elif dataformat == '5': self.logformat = [0, 1] elif line.startswith("No."): self.header = False # print('HDR: %s' % line.strip()) return try: no, r, t = line.split() self.posttreat([float(r),float(t)]) except ValueError: # print('SKIP: %s' % line.strip()) return except OverflowError: # print('OVERFLOW:', no, r, t) pass KINDS = { # lakeshore 340 format "z340": dict(path="*/%s.340", filter=Filter340), "ls340": dict(path="/afs/psi.ch/project/SampleEnvironment/SE_internal/Thermometer_calibs/*/*/%s.340:/home/l_samenv/sea/tcl/calcurves/%s.340", filter=Filter340), # markus zollikers *.inp calcurve format "inp": dict(path="/home/l_samenv/sea/tcl/calcurves/%s.inp", filter=StdFilter), # format from sea/tcl/startup/calib_ext.tcl "zdat": dict(path="/home/l_samenv/sea/calib_scripts/calib_data/%s.dat", filter=StdFilter, args=dict(colnums=(1,2))), # lakeshore raw data *.dat format "lsdat": dict(path="/afs/psi.ch/project/SampleEnvironment/SE_internal/Thermometer_calibs/*/*/%s.dat", filter=StdFilter, args=dict(colnums=(1,0))), } def read_curve(filename, kind, instance=None, **filterargs): '''general curve reading''' path = '%s' filelist = [] for path in KINDS[kind]["path"].split(':'): filelist += glob(path % filename) filelist = sorted(filelist) if instance is None : if len(filelist) > 1: for i,file in enumerate(filelist): print(i,file) raise ValueError('instance number needed') instance = 0 if len(filelist) > 0: filename = filelist[instance] try: args = KINDS[kind]["args"] except KeyError: args = {} args.update(filterargs) try: filter = KINDS[kind]["filter"](**args) except KeyError: filter = StdFilter(**args) print(kind, "READ %s" % filename) with open(filename) as f: curves = [[] for c in filter.output] for line in f: filter.parse(line) return [np.asarray(c) for c in filter.output] def convert_res(res1, res2, resc1, log1=True, log2=True, smooth=0): '''interpolate (by default in log scale) the curve res1, res2 from sensors 1,2 resc1: input for values with sensor 1 return nvalues: interpolated values for sensor 2 may also be used for converting resistivity to T or vice versa ''' if log1: res1 = nplog(res1) resc1 = nplog(resc1) if log2: res2 = nplog(res2) if res1[-1] < res1[0]: res1 = res1[::-1] res2 = res2[::-1] for r1,r2 in zip(res1[:-1], res1[1:]): if r1 >= r2: print(10 ** res1) raise ValueError('not increasing X') fun = splrep(res1, res2, s=smooth) # extrapolation outside the range only linear! beg, end = res1[0], res1[-1] slopebeg = splev(beg, fun, der=1) slopeend = splev(end, fun, der=1) resc2 = np.where(resc1 < beg, res2[0] + slopebeg * (resc1 - beg), np.where(resc1 > end, res2[-1] + slopeend * (resc1 - end), splev(resc1, fun))) if log2: resc2 = npexp(resc2) return resc2 def compare_calib(res1, res2, test1, test2, log1=True, log2=True): '''compare two curves the number of points returned is len(test1) ''' calc2 = convert_res(res1, res2, test1, log1, log2) dif = (calc2 - test2) / test2 for i, d in enumerate(dif): if test1[i] < 2: print(test1[i], test2[i], calc2[i], dif[i]) elif abs(d) > 0.1: print(test1[i], test2[i], calc2[i], dif[i]) return dif def make_calib(r_dat, t_dat, r_ref, r_test, t_points, smooth=0): '''create calibration curve r_dat,t_dat: data from known sensor r_ref, r_test: resisitvities measured t_points: points to be used for the output. ''' t_cal = np.asarray(t_points) t_cal.sort() r_dat_cal = convert_res(t_dat, r_dat, t_cal) r_cal = convert_res(r_ref, r_test, r_dat_cal, smooth=smooth) return r_cal, t_cal def logrange(first, last, decade_step=None, n=None, include_last=True): if n == 0: result = () else: loginterval = math.log10(last) - math.log10(first) if n is None: if isinstance(decade_step, int): raise ValueError('decade_step must be a float. you may also use n=') if decade_step is None: decade_step = 0.1 n = int(round(abs(loginterval) / abs(decade_step))) else: if decade_step is not None: raise ValueError('can not specify both n and decade_step') if not isinstance(n, int): raise ValueError('n must be an integer. you may also use decade_step') decade_step = loginterval / n offset = math.log10(first) result = tuple(10 ** (offset + i * decade_step) for i in range(n)) if include_last: result += (last,) return result def smooth(xx, yy, smooth): if xx[0] > xx[-1]: xxl = -nplog(xx) else: xxl = nplog(xx) return (xx, npexp(splev(xxl, splrep(xxl, nplog(yy), s=smooth)))) class Measurement(object): def __init__(self, run=None, sensor=None): self.ref = [None] * 3 self.tst = [None] * 3 if run is not None: self.read(run, sensor) def read(self, run, sensor): sensor.caldat_file = [None] * 3 for j in range(3): sensor.caldat_file[j] = run.calib_data_file % ( run.outputoptions['caldate'], j, sensor.channel) self.ref[j], self.tst[j] = read_curve(sensor.caldat_file[j], 'zdat') self.n = min([len(self.ref[j]) for j in range(3)]) def reduced(self, step): meas = Measurement() meas.n = ((self.n - 1) / step) + 1 indices = np.round(np.linspace(0,self.n-1,meas.n),0).astype(int) for j in range(3): meas.ref[j] = self.ref[j][indices] meas.tst[j] = self.tst[j][indices] return meas class Sensor(object): HEADER = '''Sensor Model: %s %s\r Serial Number: %s\r Data Format: %d (Log Ohms/Kelvin)\r SetPoint Limit: %.1f (Kelvin)\r Temperature coefficient: 1 (Negative)\r calibrange : %g,%g\r Number of Breakpoints: %d\r \r No. Units Temperature (K)\r \r ''' def __init__(self, channel, serialno, model='CX-unknown'): self.channel = channel self.serialno = serialno self.model = model def make_calib(self, run, meas, diflim=0.001, smooth=0): '''make calibration algorithm: calculate pairwise difference of samples 0,1,2 the two samples with the biggest difference are omitted for each temperature, or, if the biggest difference is < diflim, the average is used ''' dif = [0,0,0] for j in range(3): dif[j] = compare_calib(meas.ref[(j+1)%3], meas.tst[(j+1)%3], meas.ref[(j+2)%3], meas.tst[(j+2)%3]) refbest = np.zeros(meas.n) tstbest = np.zeros(meas.n) m = 0 for i in range(meas.n): jbest = 2 for j in range(3): if abs(dif[(j+1) % 3][i]) < abs(dif[j][i]) > abs(dif[(j+2) % 3][i]): jbest = j break if abs(dif[jbest][i]) > diflim: refbest[i] = meas.ref[jbest][i] tstbest[i] = meas.tst[jbest][i] m += 1 else: refbest[i] = sum(meas.ref[j][i] for j in range(3)) / 3.0 tstbest[i] = sum(meas.tst[j][i] for j in range(3)) / 3.0 print(dict(averaged=meas.n-m, selected=m)) self.rcal, self.tcal = make_calib(run.rref, run.tref, refbest, tstbest, run.t_points, smooth=smooth) self.calibrange = convert_res(self.rcal, self.tcal, [tstbest[0], tstbest[-1]]) return (self.rcal, self.tcal) def write(self, rc, tc, logR=True, logT=False, path='%s/%s.340', caldate=None): '''write a calibration curve in LakeShores 340 format logT, logR: use logT/logT for output path: for output, might contain %s to be replaced by the sensor serial no caldate: date of measurement (default: today) ''' try: dataformat = {(True,True): 5, (True,False): 4, (False,False): 3}[(logR,logT)] except KeyError: raise ValueError("logT without logR is not possible") if caldate is None: caldate = time.strftime("%Y-%m-%d") try: path = path % (caldate, self.serialno) except TypeError: pass try: path = path % self.serialno except TypeError: pass if not os.path.isdir(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) self.outputpath = path self.outputkind = 'z340' if logT: tc = nplog(tc) if logR: rc = nplog(rc) for arr in (tc, rc): if np.all(np.diff(arr) > 0) or np.all(np.diff(arr) < 0): continue raise ValueError('ERROR: data for %s is not monotonic' % self.serialno) with open(path, 'w') as f: f.write(self.HEADER % (self.model, caldate, self.serialno, dataformat, tc[-1], self.calibrange[0], self.calibrange[1], len(tc))) for i,(r,t) in enumerate(sorted(zip(rc, tc))): f.write("%3d %#10.7g %#10.7g\r\n" % (i+1,r,t)) class CalibRun(object): def __init__(self, sensors, # a list of sensors t_points=(1.0, 1.2) + logrange(1.4, 310, n=195) + (330,), caldate=None, logT=False, logR=True, calib_data_file='calib_data/calib%s_p%d_c%d.dat', outputpath='%s/%s.340'): self.refsensor = sensors[0] self.sensors = sensors[1:] self.sensdict = dict((s.serialno, s) for s in sensors) self.t_points = t_points self.calib_data_file = calib_data_file self.outputoptions = dict(caldate=caldate, logT=logT, logR=logR, path=outputpath) def make(self, diflim=0.001, smoothref=1e-7, smoothtst=1e-7, writeFile=True): '''write optimized calibration curves, selecting the best samples for each temperatures ''' self.rref, self.tref = read_curve(self.refsensor.serialno, 'lsdat') self.rref, self.tref = smooth(self.rref, self.tref, smooth=smoothref) for sensor in self.sensors: sensor.meas = Measurement(self, sensor) rc, tc = sensor.make_calib(self, sensor.meas, diflim, smoothtst) if writeFile: try: sensor.write(rc, tc, **self.outputoptions) except ValueError as e: print(e)