add zcalib.py

+ fix .gitignore
This commit is contained in:
l_samenv
2022-08-19 15:59:02 +02:00
parent f3ae053320
commit 3d7bd3b169
2 changed files with 369 additions and 3 deletions

350
calib_scripts/zcalib.py Normal file
View File

@ -0,0 +1,350 @@
'''
utilities for creating cernox calibration curves
'''
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import splrep, splev
import math
from glob import glob
import os
import time
nplog = np.vectorize(math.log10)
npexp = np.vectorize(lambda x:10 ** x)
class StdFilter(object):
'''filter used for reading columns'''
def __init__(self, colnums = None, logformat = None):
if colnums is None:
colnums = (0,1)
self.colnums = colnums
self.logformat = logformat
self.output = [[] for i in colnums]
def parse(self, line):
'''get numbers from a line and put them to self.output'''
values = []
row = line.split()
try:
for c in self.colnums:
values.append(float(row[c]))
except (IndexError, ValueError):
# print('SKIP: %s' % line.strip())
return
self.posttreat(values)
def posttreat(self, values):
'''post treatment, mainly converting from log'''
if self.logformat:
values = values[:]
for c in self.logformat:
values[c] = 10 ** values[c]
for out, val in zip(self.output, values):
out.append(val)
class Filter340(StdFilter):
'''filter for LakeShore *.340 files'''
def __init__(self):
self.logformat = None
self.header = True
self.output = [[], []]
def parse(self, line):
'''scan header for data format'''
if self.header:
if line.startswith("Data Format"):
dataformat = line.split(":")[1].strip()[0]
if dataformat == '4':
self.logformat = [0]
elif dataformat == '5':
self.logformat = [0, 1]
elif line.startswith("No."):
self.header = False
# print('HDR: %s' % line.strip())
return
try:
no, r, t = line.split()
self.posttreat([float(r),float(t)])
except ValueError:
# print('SKIP: %s' % line.strip())
return
except OverflowError:
# print('OVERFLOW:', no, r, t)
pass
KINDS = {
# lakeshore 340 format
"z340": dict(path="*/%s.340", filter=Filter340),
"ls340": dict(path="/afs/psi.ch/project/SampleEnvironment/SE_internal/Thermometer_calibs/*/*/%s.340:/home/l_samenv/sea/tcl/calcurves/%s.340", filter=Filter340),
# markus zollikers *.inp calcurve format
"inp": dict(path="/home/l_samenv/sea/tcl/calcurves/%s.inp", filter=StdFilter),
# format from sea/tcl/startup/calib_ext.tcl
"zdat": dict(path="/home/l_samenv/sea/calib_scripts/calib_data/%s.dat", filter=StdFilter, args=dict(colnums=(1,2))),
# lakeshore raw data *.dat format
"lsdat": dict(path="/afs/psi.ch/project/SampleEnvironment/SE_internal/Thermometer_calibs/*/*/%s.dat", filter=StdFilter, args=dict(colnums=(1,0))),
}
def read_curve(filename, kind, instance=None, **filterargs):
'''general curve reading'''
path = '%s'
filelist = []
for path in KINDS[kind]["path"].split(':'):
filelist += glob(path % filename)
filelist = sorted(filelist)
if instance is None :
if len(filelist) > 1:
for i,file in enumerate(filelist):
print(i,file)
raise ValueError('instance number needed')
instance = 0
if len(filelist) > 0:
filename = filelist[instance]
try:
args = KINDS[kind]["args"]
except KeyError:
args = {}
args.update(filterargs)
try:
filter = KINDS[kind]["filter"](**args)
except KeyError:
filter = StdFilter(**args)
print(kind, "READ %s" % filename)
with open(filename) as f:
curves = [[] for c in filter.output]
for line in f:
filter.parse(line)
return [np.asarray(c) for c in filter.output]
def convert_res(res1, res2, resc1, log1=True, log2=True, smooth=0):
'''interpolate (by default in log scale) the curve res1, res2 from sensors 1,2
resc1: input for values with sensor 1
return nvalues: interpolated values for sensor 2
may also be used for converting resistivity to T or vice versa
'''
if log1:
res1 = nplog(res1)
resc1 = nplog(resc1)
if log2:
res2 = nplog(res2)
if res1[-1] < res1[0]:
res1 = res1[::-1]
res2 = res2[::-1]
for r1,r2 in zip(res1[:-1], res1[1:]):
if r1 >= r2:
print(res1)
raise ValueError('not increasing X')
fun = splrep(res1, res2, s=smooth)
# extrapolation outside the range only linear!
beg, end = res1[0], res1[-1]
slopebeg = splev(beg, fun, der=1)
slopeend = splev(end, fun, der=1)
resc2 = np.where(resc1 < beg, res2[0] + slopebeg * (resc1 - beg),
np.where(resc1 > end, res2[-1] + slopeend * (resc1 - end), splev(resc1, fun)))
if log2:
resc2 = npexp(resc2)
return resc2
def compare_calib(res1, res2, test1, test2, log1=True, log2=True):
'''compare two curves
the number of points returned is len(test1)
'''
calc2 = convert_res(res1, res2, test1, log1, log2)
dif = (calc2 - test2) / test2
for i, d in enumerate(dif):
if test1[i] < 2:
print(test1[i], test2[i], calc2[i], dif[i])
elif abs(d) > 0.1:
print(test1[i], test2[i], calc2[i], dif[i])
return dif
def make_calib(r_dat, t_dat, r_ref, r_test, t_points, smooth=0):
'''create calibration curve
r_dat,t_dat: data from known sensor
r_ref, r_test: resisitvities measured
t_points: points to be used for the output.
'''
t_cal = np.asarray(t_points)
t_cal.sort()
r_dat_cal = convert_res(t_dat, r_dat, t_cal)
r_cal = convert_res(r_ref, r_test, r_dat_cal, smooth=smooth)
return r_cal, t_cal
def logrange(first, last, decade_step=None, n=None, include_last=True):
if n == 0:
result = ()
else:
loginterval = math.log10(last) - math.log10(first)
if n is None:
if isinstance(decade_step, int):
raise ValueError('decade_step must be a float. you may also use n=<number_of_steps>')
if decade_step is None:
decade_step = 0.1
n = int(round(abs(loginterval) / abs(decade_step)))
else:
if decade_step is not None:
raise ValueError('can not specify both n and decade_step')
if not isinstance(n, int):
raise ValueError('n must be an integer. you may also use decade_step')
decade_step = loginterval / n
offset = math.log10(first)
result = tuple(10 ** (offset + i * decade_step) for i in range(n))
if include_last:
result += (last,)
return result
def smooth(xx, yy, smooth):
if xx[0] > xx[-1]:
xxl = -nplog(xx)
else:
xxl = nplog(xx)
return (xx, npexp(splev(xxl, splrep(xxl, nplog(yy), s=smooth))))
class Measurement(object):
def __init__(self, run=None, sensor=None):
self.ref = [None] * 3
self.tst = [None] * 3
if run is not None:
self.read(run, sensor)
def read(self, run, sensor):
sensor.caldat_file = [None] * 3
for j in range(3):
sensor.caldat_file[j] = run.calib_data_file % (
run.outputoptions['caldate'], j, sensor.channel)
self.ref[j], self.tst[j] = read_curve(sensor.caldat_file[j], 'zdat')
self.n = min([len(self.ref[j]) for j in range(3)])
def reduced(self, step):
meas = Measurement()
meas.n = ((self.n - 1) / step) + 1
indices = np.round(np.linspace(0,self.n-1,meas.n),0).astype(int)
for j in range(3):
meas.ref[j] = self.ref[j][indices]
meas.tst[j] = self.tst[j][indices]
return meas
class Sensor(object):
HEADER = '''Sensor Model: %s %s\r
Serial Number: %s\r
Data Format: %d (Log Ohms/Kelvin)\r
SetPoint Limit: %.1f (Kelvin)\r
Temperature coefficient: 1 (Negative)\r
Number of Breakpoints: %d\r
\r
No. Units Temperature (K)\r
\r
'''
def __init__(self, channel, serialno, model='CX-unknown'):
self.channel = channel
self.serialno = serialno
self.model = model
def make_calib(self, run, meas, diflim=0.001, smooth=0):
'''make calibration
algorithm: calculate pairwise difference of samples 0,1,2
the two samples with the biggest difference are omitted for each temperature,
or, if the biggest difference is < diflim, the average is used
'''
dif = [0,0,0]
for j in range(3):
dif[j] = compare_calib(meas.ref[(j+1)%3], meas.tst[(j+1)%3], meas.ref[(j+2)%3], meas.tst[(j+2)%3])
refbest = np.zeros(meas.n)
tstbest = np.zeros(meas.n)
m = 0
for i in range(meas.n):
jbest = 2
for j in range(3):
if abs(dif[(j+1) % 3][i]) < abs(dif[j][i]) > abs(dif[(j+2) % 3][i]):
jbest = j
break
if abs(dif[jbest][i]) > diflim:
refbest[i] = meas.ref[jbest][i]
tstbest[i] = meas.tst[jbest][i]
m += 1
else:
refbest[i] = sum(meas.ref[j][i] for j in range(3)) / 3.0
tstbest[i] = sum(meas.tst[j][i] for j in range(3)) / 3.0
print(dict(averaged=meas.n-m, selected=m))
self.rcal, self.tcal = make_calib(run.rref, run.tref, refbest, tstbest, run.t_points, smooth=smooth)
return (self.rcal, self.tcal)
def write(self, rc, tc, logR=True, logT=False, path='%s/%s.340', caldate=None):
'''write a calibration curve in LakeShores 340 format
logT, logR: use logT/logT for output
path: for output, might contain %s to be replaced by the sensor serial no
caldate: date of measurement (default: today)
'''
try:
dataformat = {(True,True): 5, (True,False): 4, (False,False): 3}[(logR,logT)]
except KeyError:
raise ValueError("logT without logR is not possible")
if caldate is None:
caldate = time.strftime("%Y-%m-%d")
try:
path = path % (caldate, self.serialno)
except TypeError:
pass
try:
path = path % self.serialno
except TypeError:
pass
if not os.path.isdir(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
self.outputpath = path
self.outputkind = 'z340'
if logT:
tc = nplog(tc)
if logR:
rc = nplog(rc)
for arr in (tc, rc):
if np.all(np.diff(arr) > 0) or np.all(np.diff(arr) < 0):
continue
raise ValueError('ERROR: data for %s is not monotonic' % self.serialno)
with open(path, 'w') as f:
f.write(self.HEADER % (self.model, caldate, self.serialno, dataformat, tc[-1], len(tc)))
for i,(r,t) in enumerate(sorted(zip(rc, tc))):
f.write("%3d %#10.7g %#10.7g\r\n" % (i+1,r,t))
class CalibRun(object):
def __init__(self,
sensors, # a list of sensors
t_points=(1.0, 1.2) + logrange(1.4, 310, n=195) + (330,),
caldate=None,
logT=False,
logR=True,
calib_data_file='calib_data/calib%s_p%d_c%d.dat',
outputpath='%s/%s.340'):
self.refsensor = sensors[0]
self.sensors = sensors[1:]
self.sensdict = dict((s.serialno, s) for s in sensors)
self.t_points = t_points
self.calib_data_file = calib_data_file
self.outputoptions = dict(caldate=caldate, logT=logT, logR=logR, path=outputpath)
def make(self, diflim=0.001, smoothref=1e-7, smoothtst=1e-7, writeFile=True):
'''write optimized calibration curves, selecting the best samples for each temperatures
'''
self.rref, self.tref = read_curve(self.refsensor.serialno, 'lsdat')
self.rref, self.tref = smooth(self.rref, self.tref, smooth=smoothref)
for sensor in self.sensors:
sensor.meas = Measurement(self, sensor)
rc, tc = sensor.make_calib(self, sensor.meas, diflim, smoothtst)
if writeFile:
try:
sensor.write(rc, tc, **self.outputoptions)
except ValueError as e:
print(e)