diff --git a/.gitignore b/.gitignore index c3f862e..f6bbd18 100644 --- a/.gitignore +++ b/.gitignore @@ -14,11 +14,27 @@ CVS /GraphServer0 /SeaServer0 /*_sea_logger.tar -/sea.tcl -/graph.tcl +/sea*.tcl +/graph*.tcl /history /gzlogger /log /logger /status - +/lab* +/prep* +/amor +/boa +/camea +/dmc +/eiger +/focus +/hifi +/hrpt +/morpheus +/poldi +/ppms +/sans +/seaman +/tasp +/zebra diff --git a/calib_scripts/zcalib.py b/calib_scripts/zcalib.py new file mode 100644 index 0000000..0afe87c --- /dev/null +++ b/calib_scripts/zcalib.py @@ -0,0 +1,350 @@ +''' +utilities for creating cernox calibration curves +''' + +import numpy as np +import matplotlib.pyplot as plt +from scipy.interpolate import splrep, splev +import math +from glob import glob +import os +import time + +nplog = np.vectorize(math.log10) +npexp = np.vectorize(lambda x:10 ** x) + +class StdFilter(object): + '''filter used for reading columns''' + + def __init__(self, colnums = None, logformat = None): + if colnums is None: + colnums = (0,1) + self.colnums = colnums + self.logformat = logformat + self.output = [[] for i in colnums] + + def parse(self, line): + '''get numbers from a line and put them to self.output''' + values = [] + row = line.split() + try: + for c in self.colnums: + values.append(float(row[c])) + except (IndexError, ValueError): + # print('SKIP: %s' % line.strip()) + return + self.posttreat(values) + + def posttreat(self, values): + '''post treatment, mainly converting from log''' + if self.logformat: + values = values[:] + for c in self.logformat: + values[c] = 10 ** values[c] + for out, val in zip(self.output, values): + out.append(val) + +class Filter340(StdFilter): + '''filter for LakeShore *.340 files''' + + def __init__(self): + self.logformat = None + self.header = True + self.output = [[], []] + + def parse(self, line): + '''scan header for data format''' + if self.header: + if line.startswith("Data Format"): + dataformat = line.split(":")[1].strip()[0] + if dataformat == '4': + self.logformat = [0] + elif dataformat == '5': + self.logformat = [0, 1] + elif line.startswith("No."): + self.header = False + # print('HDR: %s' % line.strip()) + return + try: + no, r, t = line.split() + self.posttreat([float(r),float(t)]) + except ValueError: + # print('SKIP: %s' % line.strip()) + return + except OverflowError: + # print('OVERFLOW:', no, r, t) + pass + +KINDS = { + # lakeshore 340 format + "z340": dict(path="*/%s.340", filter=Filter340), + "ls340": dict(path="/afs/psi.ch/project/SampleEnvironment/SE_internal/Thermometer_calibs/*/*/%s.340:/home/l_samenv/sea/tcl/calcurves/%s.340", filter=Filter340), + # markus zollikers *.inp calcurve format + "inp": dict(path="/home/l_samenv/sea/tcl/calcurves/%s.inp", filter=StdFilter), + # format from sea/tcl/startup/calib_ext.tcl + "zdat": dict(path="/home/l_samenv/sea/calib_scripts/calib_data/%s.dat", filter=StdFilter, args=dict(colnums=(1,2))), + # lakeshore raw data *.dat format + "lsdat": dict(path="/afs/psi.ch/project/SampleEnvironment/SE_internal/Thermometer_calibs/*/*/%s.dat", filter=StdFilter, args=dict(colnums=(1,0))), +} + +def read_curve(filename, kind, instance=None, **filterargs): + '''general curve reading''' + path = '%s' + filelist = [] + for path in KINDS[kind]["path"].split(':'): + filelist += glob(path % filename) + filelist = sorted(filelist) + if instance is None : + if len(filelist) > 1: + for i,file in enumerate(filelist): + print(i,file) + raise ValueError('instance number needed') + instance = 0 + if len(filelist) > 0: + filename = filelist[instance] + try: + args = KINDS[kind]["args"] + except KeyError: + args = {} + args.update(filterargs) + try: + filter = KINDS[kind]["filter"](**args) + except KeyError: + filter = StdFilter(**args) + print(kind, "READ %s" % filename) + + with open(filename) as f: + curves = [[] for c in filter.output] + for line in f: + filter.parse(line) + return [np.asarray(c) for c in filter.output] + +def convert_res(res1, res2, resc1, log1=True, log2=True, smooth=0): + '''interpolate (by default in log scale) the curve res1, res2 from sensors 1,2 + + resc1: input for values with sensor 1 + return nvalues: interpolated values for sensor 2 + + may also be used for converting resistivity to T or vice versa + ''' + if log1: + res1 = nplog(res1) + resc1 = nplog(resc1) + if log2: + res2 = nplog(res2) + if res1[-1] < res1[0]: + res1 = res1[::-1] + res2 = res2[::-1] + for r1,r2 in zip(res1[:-1], res1[1:]): + if r1 >= r2: + print(res1) + raise ValueError('not increasing X') + fun = splrep(res1, res2, s=smooth) + # extrapolation outside the range only linear! + beg, end = res1[0], res1[-1] + slopebeg = splev(beg, fun, der=1) + slopeend = splev(end, fun, der=1) + resc2 = np.where(resc1 < beg, res2[0] + slopebeg * (resc1 - beg), + np.where(resc1 > end, res2[-1] + slopeend * (resc1 - end), splev(resc1, fun))) + if log2: + resc2 = npexp(resc2) + return resc2 + +def compare_calib(res1, res2, test1, test2, log1=True, log2=True): + '''compare two curves + + the number of points returned is len(test1) + ''' + calc2 = convert_res(res1, res2, test1, log1, log2) + dif = (calc2 - test2) / test2 + for i, d in enumerate(dif): + if test1[i] < 2: + print(test1[i], test2[i], calc2[i], dif[i]) + elif abs(d) > 0.1: + print(test1[i], test2[i], calc2[i], dif[i]) + return dif + +def make_calib(r_dat, t_dat, r_ref, r_test, t_points, smooth=0): + '''create calibration curve + + r_dat,t_dat: data from known sensor + r_ref, r_test: resisitvities measured + t_points: points to be used for the output. + ''' + t_cal = np.asarray(t_points) + t_cal.sort() + r_dat_cal = convert_res(t_dat, r_dat, t_cal) + r_cal = convert_res(r_ref, r_test, r_dat_cal, smooth=smooth) + return r_cal, t_cal + +def logrange(first, last, decade_step=None, n=None, include_last=True): + if n == 0: + result = () + else: + loginterval = math.log10(last) - math.log10(first) + if n is None: + if isinstance(decade_step, int): + raise ValueError('decade_step must be a float. you may also use n=') + if decade_step is None: + decade_step = 0.1 + n = int(round(abs(loginterval) / abs(decade_step))) + else: + if decade_step is not None: + raise ValueError('can not specify both n and decade_step') + if not isinstance(n, int): + raise ValueError('n must be an integer. you may also use decade_step') + decade_step = loginterval / n + offset = math.log10(first) + result = tuple(10 ** (offset + i * decade_step) for i in range(n)) + if include_last: + result += (last,) + return result + +def smooth(xx, yy, smooth): + if xx[0] > xx[-1]: + xxl = -nplog(xx) + else: + xxl = nplog(xx) + return (xx, npexp(splev(xxl, splrep(xxl, nplog(yy), s=smooth)))) + + +class Measurement(object): + def __init__(self, run=None, sensor=None): + self.ref = [None] * 3 + self.tst = [None] * 3 + if run is not None: + self.read(run, sensor) + + def read(self, run, sensor): + sensor.caldat_file = [None] * 3 + for j in range(3): + sensor.caldat_file[j] = run.calib_data_file % ( + run.outputoptions['caldate'], j, sensor.channel) + self.ref[j], self.tst[j] = read_curve(sensor.caldat_file[j], 'zdat') + self.n = min([len(self.ref[j]) for j in range(3)]) + + def reduced(self, step): + meas = Measurement() + meas.n = ((self.n - 1) / step) + 1 + indices = np.round(np.linspace(0,self.n-1,meas.n),0).astype(int) + for j in range(3): + meas.ref[j] = self.ref[j][indices] + meas.tst[j] = self.tst[j][indices] + return meas + +class Sensor(object): + HEADER = '''Sensor Model: %s %s\r +Serial Number: %s\r +Data Format: %d (Log Ohms/Kelvin)\r +SetPoint Limit: %.1f (Kelvin)\r +Temperature coefficient: 1 (Negative)\r +Number of Breakpoints: %d\r +\r +No. Units Temperature (K)\r +\r +''' + + def __init__(self, channel, serialno, model='CX-unknown'): + self.channel = channel + self.serialno = serialno + self.model = model + + def make_calib(self, run, meas, diflim=0.001, smooth=0): + '''make calibration + + algorithm: calculate pairwise difference of samples 0,1,2 + the two samples with the biggest difference are omitted for each temperature, + or, if the biggest difference is < diflim, the average is used + ''' + dif = [0,0,0] + for j in range(3): + dif[j] = compare_calib(meas.ref[(j+1)%3], meas.tst[(j+1)%3], meas.ref[(j+2)%3], meas.tst[(j+2)%3]) + refbest = np.zeros(meas.n) + tstbest = np.zeros(meas.n) + m = 0 + for i in range(meas.n): + jbest = 2 + for j in range(3): + if abs(dif[(j+1) % 3][i]) < abs(dif[j][i]) > abs(dif[(j+2) % 3][i]): + jbest = j + break + if abs(dif[jbest][i]) > diflim: + refbest[i] = meas.ref[jbest][i] + tstbest[i] = meas.tst[jbest][i] + m += 1 + else: + refbest[i] = sum(meas.ref[j][i] for j in range(3)) / 3.0 + tstbest[i] = sum(meas.tst[j][i] for j in range(3)) / 3.0 + print(dict(averaged=meas.n-m, selected=m)) + self.rcal, self.tcal = make_calib(run.rref, run.tref, refbest, tstbest, run.t_points, smooth=smooth) + return (self.rcal, self.tcal) + + def write(self, rc, tc, logR=True, logT=False, path='%s/%s.340', caldate=None): + '''write a calibration curve in LakeShores 340 format + + logT, logR: use logT/logT for output + path: for output, might contain %s to be replaced by the sensor serial no + caldate: date of measurement (default: today) + ''' + try: + dataformat = {(True,True): 5, (True,False): 4, (False,False): 3}[(logR,logT)] + except KeyError: + raise ValueError("logT without logR is not possible") + if caldate is None: + caldate = time.strftime("%Y-%m-%d") + try: + path = path % (caldate, self.serialno) + except TypeError: + pass + try: + path = path % self.serialno + except TypeError: + pass + if not os.path.isdir(os.path.dirname(path)): + os.makedirs(os.path.dirname(path)) + self.outputpath = path + self.outputkind = 'z340' + if logT: + tc = nplog(tc) + if logR: + rc = nplog(rc) + for arr in (tc, rc): + if np.all(np.diff(arr) > 0) or np.all(np.diff(arr) < 0): + continue + raise ValueError('ERROR: data for %s is not monotonic' % self.serialno) + with open(path, 'w') as f: + f.write(self.HEADER % (self.model, caldate, self.serialno, dataformat, tc[-1], len(tc))) + for i,(r,t) in enumerate(sorted(zip(rc, tc))): + f.write("%3d %#10.7g %#10.7g\r\n" % (i+1,r,t)) + +class CalibRun(object): + def __init__(self, + sensors, # a list of sensors + t_points=(1.0, 1.2) + logrange(1.4, 310, n=195) + (330,), + caldate=None, + logT=False, + logR=True, + calib_data_file='calib_data/calib%s_p%d_c%d.dat', + outputpath='%s/%s.340'): + + self.refsensor = sensors[0] + self.sensors = sensors[1:] + self.sensdict = dict((s.serialno, s) for s in sensors) + self.t_points = t_points + self.calib_data_file = calib_data_file + self.outputoptions = dict(caldate=caldate, logT=logT, logR=logR, path=outputpath) + + def make(self, diflim=0.001, smoothref=1e-7, smoothtst=1e-7, writeFile=True): + '''write optimized calibration curves, selecting the best samples for each temperatures + ''' + self.rref, self.tref = read_curve(self.refsensor.serialno, 'lsdat') + self.rref, self.tref = smooth(self.rref, self.tref, smooth=smoothref) + + for sensor in self.sensors: + sensor.meas = Measurement(self, sensor) + rc, tc = sensor.make_calib(self, sensor.meas, diflim, smoothtst) + if writeFile: + try: + sensor.write(rc, tc, **self.outputoptions) + except ValueError as e: + print(e) \ No newline at end of file