diff --git a/cfg/ls340_cfg.py b/cfg/ls340_cfg.py index 3d60963..e039854 100644 --- a/cfg/ls340_cfg.py +++ b/cfg/ls340_cfg.py @@ -4,33 +4,22 @@ Node('ls340test.psi.ch', ) Mod('io', - 'frappy_psi.lakeshore.Ls340IO', + 'frappy_psi.lakeshore.IO340', 'communication to ls340', - uri='tcp://ldmprep56-ts:3002' + uri='tcp://localhost:7777' ) +Mod('dev', + 'frappy_psi.lakeshore.Device340', + 'device for calcurve', + io='io', + curve_handling=True, + ) Mod('T', - 'frappy_psi.lakeshore.TemperatureLoop340', - 'sample temperature', - output_module='Heater', - target=Param(max=470), - io='io', - channel='B' - ) - -Mod('T_cold_finger', 'frappy_psi.lakeshore.Sensor340', - 'cold finger temperature', - io='io', - channel='A' - ) - -Mod('Heater', - 'frappy_psi.lakeshore.HeaterOutput', - 'heater output', - channel='B', - io='io', - resistance=25, - max_power=50, - current=1 + 'sample temperature', + # output_module='Heater', + device='dev', + channel='A', + calcurve='x29746', ) diff --git a/cfg/main/ori7test_cfg.py b/cfg/main/ori7test_cfg.py new file mode 100644 index 0000000..578073f --- /dev/null +++ b/cfg/main/ori7test_cfg.py @@ -0,0 +1,17 @@ +from frappy_psi.ccracks import Rack + +Node('ori7test.psi.ch', + 'ORI7 test', + 'tcp://5000' + ) + +rack = Rack(Mod) + +with rack.lakeshore() as ls: + ls.sensor('Ts', channel='C', calcurve='x186350') + ls.loop('T', channel='B', calcurve='x174786') + ls.heater('htr', '100W', 100) + +rack.ccu(he=True, n2=True) + +rack.hepump() diff --git a/frappy/lib/units.py b/frappy/lib/units.py new file mode 100644 index 0000000..a52c102 --- /dev/null +++ b/frappy/lib/units.py @@ -0,0 +1,50 @@ +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** +"""handling of prefixes of physical units""" + +import re +import prefixed + +prefixed.SI_MAGNITUDE['u'] = 1e-6 # accept 'u' as replacement for 'ยต' + + +class NumberWithUnit: + def __init__(self, *units): + pfx = "|".join(prefixed.SI_MAGNITUDE) + unt = "|".join(units) + self.units = units + self.pattern = re.compile(rf'\s*([+-]?\d*\.?\d*(?:[eE][+-]?\d+)?\s*(?:{pfx})?)({unt})\s*$') + + def parse(self, value): + """parse and return number and value""" + match = self.pattern.match(value) + if not match: + raise ValueError(f'{value!r} can not be interpreted as a number with unit {",".join(self.units)}') + number, unit = match.groups() + return prefixed.Float(number), unit + + def getnum(self, value): + """parse and return value only""" + return self.parse(value)[0] + + +def format_with_unit(value, unit='', digits=3): + return f'{prefixed.Float(value):.{digits}H}{unit}' diff --git a/frappy_psi/calcurve.py b/frappy_psi/calcurve.py index 4083ff3..a0e2575 100644 --- a/frappy_psi/calcurve.py +++ b/frappy_psi/calcurve.py @@ -22,6 +22,7 @@ import os import re +from pathlib import Path from os.path import basename, dirname, exists, join import numpy as np @@ -31,13 +32,22 @@ from scipy.interpolate import PchipInterpolator, CubicSpline, PPoly # pylint: d from frappy.errors import ProgrammingError, RangeError from frappy.lib import clamp + +def identity(x): + return x + + +def exp10(x): + return 10 ** np.array(x) + + to_scale = { - 'lin': lambda x: x, - 'log': lambda x: np.log10(x), + 'lin': identity, + 'log': np.log10, } from_scale = { - 'lin': lambda x: x, - 'log': lambda x: 10 ** np.array(x), + 'lin': identity, + 'log': exp10, } TYPES = [ # lakeshore type, inp-type, loglog ('DT', 'si', False), # Si diode @@ -55,7 +65,7 @@ TYPES = [ # lakeshore type, inp-type, loglog OPTION_TYPE = { 'loglog': 0, # boolean - 'extrange': 2, # tuple(min T, max T for extrapolation + 'extrange': 2, # tuple(min T, max T) for extrapolation 'calibrange': 2, # tuple(min T, max T) } @@ -222,14 +232,6 @@ PARSERS = { } -def check(x, y, islog): - # check interpolation error - yi = y[:-2] + (x[1:-1] - x[:-2]) * (y[2:] - y[:-2]) / (x[2:] - x[:-2]) - if islog: - return sum((yi - y[1:-1]) ** 2) - return sum((np.log10(yi) - np.log10(y[1:-1])) ** 2) - - def get_curve(newscale, curves): """get curve from curve cache (converts not existing ones) @@ -247,6 +249,7 @@ def get_curve(newscale, curves): class CalCurve(HasOptions): EXTRAPOLATION_AMOUNT = 0.1 MAX_EXTRAPOLATION_FACTOR = 2 + filename = None # calibration file def __init__(self, calibspec=None, *, x=None, y=None, cubic_spline=True, **options): """calibration curve @@ -257,7 +260,7 @@ class CalCurve(HasOptions): [ | ][,= ...] for / as in parser arguments :param x, y: x and y arrays (given instead of calibspec) - :param cubic_split: set to False for always using Pchip interpolation + :param cubic_spline: set to False for always using Pchip interpolation :param options: options for parsers """ self.options = options @@ -265,26 +268,31 @@ class CalCurve(HasOptions): parser = StdParser() parser.xdata = x parser.ydata = y + self.calibname = 'custom' else: if x or y: raise ProgrammingError('can not give both calibspec and x,y ') sensopt = calibspec.split(',') calibname = sensopt.pop(0) - _, dot, ext = basename(calibname).rpartition('.') + self.calibname = basename(calibname) + head, dot, ext = self.calibname.rpartition('.') + if dot: + self.calibname = head kind = None - pathlist = os.environ.get('FRAPPY_CALIB_PATH', '').split(':') - pathlist.append(join(dirname(__file__), 'calcurves')) + pathlist = [Path(p.strip()) for p in os.environ.get('FRAPPY_CALIB_PATH', '').split(':')] + pathlist.append(Path(dirname(__file__)) / 'calcurves') for path in pathlist: # first try without adding kind - filename = join(path.strip(), calibname) - if exists(filename): + filename = path / calibname + if filename.exists(): kind = ext if dot else None break # then try adding all kinds as extension for nam in calibname, calibname.upper(), calibname.lower(): for kind in PARSERS: - filename = join(path.strip(), '%s.%s' % (nam, kind)) + filename = path / f'{nam}.{kind}' if exists(filename): + self.filename = filename break else: continue @@ -328,6 +336,7 @@ class CalCurve(HasOptions): not_incr_idx = np.argwhere(x[1:] <= x[:-1]) if len(not_incr_idx): raise RangeError('x not monotonic at x=%.4g' % x[not_incr_idx[0]]) + self.ptc = y[-1] > y[0] self.x = {parser.xscale: x} self.y = {parser.yscale: y} @@ -344,8 +353,7 @@ class CalCurve(HasOptions): self.convert_x = to_scale[newscale] self.convert_y = from_scale[newscale] self.calibrange = self.options.get('calibrange') - dirty = set() - self.extra_points = False + self.extra_points = (0, 0) self.cutted = False if self.calibrange: self.calibrange = sorted(self.calibrange) @@ -371,7 +379,6 @@ class CalCurve(HasOptions): self.y = {newscale: y} ibeg = 0 iend = len(x) - dirty.add('xy') else: self.extra_points = ibeg, len(x) - iend else: @@ -493,13 +500,48 @@ class CalCurve(HasOptions): except IndexError: return defaultx - def export(self, logformat=False, nmax=199, yrange=None, extrapolate=True, xlimits=None): + def interpolation_error(self, x0, x1, y0, y1, funx, funy, relerror, return_tuple=False): + """calcualte interpoaltion error + + :param x0: start of interval + :param x1: end of interval + :param y0: y at start of interval + :param y1: y at end of interval + :param funx: function to convert x from exported scale to internal scale + :param funy: function to convert y from internal scale to exported scale + :param relerror: True when the exported y scale is linear + :param return_tuple: True: return interpolation error as a tuple with two values + (without and with 3 additional points) + False: return one value without additional points + :return: relative deviation + """ + xspace = np.linspace(x0, x1, 9) + x = funx(xspace) + yr = self.spline(x) + yspline = funy(yr) + yinterp = y0 + np.linspace(0.0, y1 - y0, 9) + # difference between spline (at m points) and liner interpolation + diff = np.abs(yspline - yinterp) + # estimate of interpolation error with 4 sections: + # difference between spline (at m points) and linear interpolation between neighboring points + + if relerror: + fact = 2 / (np.abs(y0) + np.abs(y1)) # division by zero can not happen, as y0 and y1 can not both be zero + else: + fact = 2.3 # difference is in log10 -> multiply by 1 / log10(e) + result = np.max(diff, axis=0) * fact + if return_tuple: + diff2 = np.abs(0.5 * (yspline[:-2:2] + yspline[2::2]) - funy(yr[1:-1:2])) + return result, np.max(diff2, axis=0) * fact + return result + + def export(self, logformat=False, nmax=199, yrange=None, extrapolate=True, xlimits=None, nmin=199): """export curve for downloading to hardware :param nmax: max number of points. if the number of given points is bigger, the points with the lowest interpolation error are omitted - :param logformat: a list with two elements of None, True or False - True: use log, False: use line, None: use log if self.loglog + :param logformat: a list with two elements of None, True or False for x and y + True: use log, False: use lin, None: use log if self.loglog values None are replaced with the effectively used format False / True are replaced by [False, False] / [True, True] default is False @@ -507,25 +549,26 @@ class CalCurve(HasOptions): :param extrapolate: a flag indicating whether the curves should be extrapolated to the preset extrapolation range :param xlimits: max x range + :param nmin: minimum number of points :return: numpy array with 2 dimensions returning the curve """ if logformat in (True, False): - logformat = [logformat, logformat] + logformat = (logformat, logformat) + self.logformat = list(logformat) try: scales = [] for idx, logfmt in enumerate(logformat): if logfmt and self.lin_forced[idx]: raise ValueError('%s must contain positive values only' % 'xy'[idx]) - logformat[idx] = linlog = self.loglog if logfmt is None else logfmt + self.logformat[idx] = linlog = self.loglog if logfmt is None else logfmt scales.append('log' if linlog else 'lin') xscale, yscale = scales except (TypeError, AssertionError): - raise ValueError('logformat must be a 2 element list or a boolean') + raise ValueError('logformat must be a 2 element sequence or a boolean') - x = self.spline.x[1:-1] # raw units, excluding extrapolated points - x1, x2 = xmin, xmax = x[0], x[-1] - y1, y2 = sorted(self.spline([x1, x2])) + xr = self.spline.x[1:-1] # raw units, excluding extrapolated points + x1, x2 = xmin, xmax = xr[0], xr[-1] if extrapolate and not yrange: yrange = self.exty @@ -535,42 +578,100 @@ class CalCurve(HasOptions): lim = to_scale[self.scale](xlimits) xmin = clamp(xmin, *lim) xmax = clamp(xmax, *lim) + # start and end index of calibrated range + ibeg, iend = self.extra_points[0], len(xr) - self.extra_points[1] if xmin != x1 or xmax != x2: - ibeg, iend = np.searchsorted(x, (xmin, xmax)) - if abs(x[ibeg] - xmin) < 0.1 * (x[ibeg + 1] - x[ibeg]): + i, j = np.searchsorted(xr, (xmin, xmax)) + if abs(xr[i] - xmin) < 0.1 * (xr[i + 1] - xr[i]): # remove first point, if close - ibeg += 1 - if abs(x[iend - 1] - xmax) < 0.1 * (x[iend - 1] - x[iend - 2]): + i += 1 + if abs(xr[j - 1] - xmax) < 0.1 * (xr[j - 1] - xr[j - 2]): # remove last point, if close - iend -= 1 - x = np.concatenate(([xmin], x[ibeg:iend], [xmax])) - y = self.spline(x) + j -= 1 + offset = i - 1 + xr = np.concatenate(([xmin], xr[i:j], [xmax])) + ibeg = max(0, ibeg - offset) + iend = min(len(xr), iend - offset) + + yr = self.spline(xr) # convert to exported scale - if xscale != self.scale: - x = to_scale[xscale](from_scale[self.scale](x)) - if yscale != self.scale: - y = to_scale[yscale](from_scale[self.scale](y)) - - # reduce number of points, if needed - n = len(x) - i, j = 1, n - 1 # index range for calculating interpolation deviation - deviation = np.zeros(n) - while True: - # calculate interpolation error when a single point is omitted - ym = y[i-1:j-1] + (x[i:j] - x[i-1:j-1]) * (y[i+1:j+1] - y[i-1:j-1]) / (x[i+1:j+1] - x[i-1:j-1]) - if yscale == 'log': - deviation[i:j] = np.abs(ym - y[i:j]) + if xscale == self.scale: + xbwd = identity + x = xr + else: + if self.scale == 'log': + xfwd, xbwd = from_scale[self.scale], to_scale[self.scale] else: - deviation[i:j] = np.abs(ym - y[i:j]) / (np.abs(ym + y[i:j]) + 1e-10) - if n <= nmax: - break - idx = np.argmin(deviation[1:-1]) + 1 # find index of the smallest error - y = np.delete(y, idx) - x = np.delete(x, idx) - deviation = np.delete(deviation, idx) - n -= 1 - # index range to recalculate - i, j = max(1, idx - 1), min(n - 1, idx + 1) - self.deviation = deviation # for debugging purposes + xfwd, xbwd = to_scale[xscale], from_scale[xscale] + x = xfwd(xr) + if yscale == self.scale: + yfwd = identity + y = yr + else: + if self.scale == 'log': + yfwd = from_scale[self.scale] + else: + yfwd = to_scale[yscale] + y = yfwd(yr) + + self.deviation = None + nmin = min(nmin, nmax) + n = len(x) + relerror = yscale == 'lin' + if len(x) > nmax: + # reduce number of points, if needed + i, j = 1, n - 1 # index range for calculating interpolation deviation + deviation = np.zeros(n) + while True: + deviation[i:j] = self.interpolation_error( + x[i-1:j-1], x[i+1:j+1], y[i-1:j-1], y[i+1:j+1], + xbwd, yfwd, relerror) + # calculate interpolation error when a single point is omitted + if n <= nmax: + break + idx = np.argmin(deviation[1:-1]) + 1 # find index of the smallest error + y = np.delete(y, idx) + x = np.delete(x, idx) + deviation = np.delete(deviation, idx) + n = len(x) + # index range to recalculate + i, j = max(1, idx - 1), min(n - 1, idx + 1) + self.deviation = deviation # for debugging purposes + elif n < nmin: + if ibeg + 1 < iend: + diff1, diff4 = self.interpolation_error( + x[ibeg:iend - 1], x[ibeg + 1:iend], y[ibeg:iend - 1], y[ibeg + 1:iend], + xbwd, yfwd, relerror, return_tuple=True) + dif_target = 1e-4 + sq4 = np.sqrt(diff4) * 4 + sq1 = np.sqrt(diff1) + offset = 0.49 + n_mid = nmax - len(x) + iend - ibeg - 1 + # iteration to find a dif target resulting in no more than nmax points + while True: + scale = 1 / np.sqrt(dif_target) + # estimate number of intermediate points (float!) needed to reach dif_target + # number of points estimated from the result of the interpolation error with 4 sections + n4 = np.maximum(1, sq4 * scale) + # number of points estimated from the result of the interpolation error with 1 section + n1 = np.maximum(1, sq1 * scale) + # use n4 where n4 > 4, n1, where n1 < 1 and a weighted average in between + nn = np.select([n4 > 4, n1 > 1], + [n4, (n4 * (n1 - 1) + n1 * (4 - n4)) / (3 + n1 - n4)], n1) + n_tot = np.sum(np.rint(nn + offset)) + extra = n_tot - n_mid + if extra <= 0: + break + dif_target *= (n_tot / n_mid) ** 2 + + xnew = [x[:ibeg]] + for x0, x1, ni in zip(x[ibeg:iend-1], x[ibeg+1:iend], np.rint(nn + offset)): + xnew.append(np.linspace(x0, x1, int(ni) + 1)[:-1]) + xnew.append(x[iend-1:]) + x = np.concatenate(xnew) + y = yfwd(self.spline(xbwd(x))) + # for debugging purposes: + self.deviation = self.interpolation_error(x[:-1], x[1:], y[:-1], y[1:], xbwd, yfwd, relerror) + return np.stack([x, y], axis=1) diff --git a/frappy_psi/ccracks.py b/frappy_psi/ccracks.py new file mode 100644 index 0000000..0469426 --- /dev/null +++ b/frappy_psi/ccracks.py @@ -0,0 +1,125 @@ +import os +from configparser import ConfigParser + + +class Lsc: + def __init__(self, modfactory, ls_uri, ls_ioname='lsio', ls_devname='ls', ls_model='336', **kwds): + self.modfactory = Mod = modfactory + self.model = ls_model + self.ioname = ls_ioname + self.devname = ls_devname + self.io = Mod(self.ioname, cls=f'frappy_psi.lakeshore.IO{self.model}', + description='comm. to lakeshore in cc rack', + uri=ls_uri) + self.dev = Mod(self.devname, cls=f'frappy_psi.lakeshore.Device{self.model}', + description='lakeshore in cc rack', io=self.ioname, curve_handling=True) + self.loops = {} + self.outputs = {} + + def sensor(self, name, channel, calcurve, **kwds): + Mod = self.modfactory + kwds.setdefault('cls', f'frappy_psi.lakeshore.Sensor{self.model}') + kwds.setdefault('description', f'T sensor {name}') + return Mod(name, channel=channel, calcurve=calcurve, + io=self.ioname, device=self.devname, **kwds) + + def loop(self, name, channel, calcurve, **kwds): + Mod = self.modfactory + kwds.setdefault('cls', f'frappy_psi.lakeshore.Loop{self.model}') + kwds.setdefault('description', f'T loop {name}') + mod = Mod(name, channel=channel, calcurve=calcurve, + io=self.ioname, device=self.devname, **kwds) + self.loops[name] = mod + return mod + + def heater(self, name, max_heater, resistance, output_no=1, **kwds): + Mod = self.modfactory + if output_no == 1: + kwds.setdefault('cls', f'frappy_psi.lakeshore.MainOutput{self.model}') + elif output_no == 2: + kwds.setdefault('cls', f'frappy_psi.lakeshore.SecondaryOutput{self.model}') + else: + return + kwds.setdefault('description', '') + mod = Mod(name, max_heater=max_heater, resistance=resistance, + io=self.ioname, device=self.devname, **kwds) + self.outputs[name] = mod + return mod + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + outmodules = dict(self.outputs) + for name, loop in self.loops.items(): + outname = loop.get('output_module') + if outname: + out = outmodules.pop(outname, None) + if not out: + raise KeyError(f'{outname} is not a output module in this lakeshore') + else: + if not outmodules: + raise KeyError(f'{name} needs an output module on this lakeshore') + outname = list(outmodules)[0] + out = outmodules.pop(outname) + loop['output_module'] = outname + if not out['description']: + out['description'] = f'heater for {outname}' + + +class CCU: + def __init__(self, Mod, ccu_uri, ccu_ioname='ccuio', ccu_devname='ccu', he=None, n2=None, **kwds): + self.ioname = ccu_ioname + self.devname = ccu_devname + Mod(self.ioname, 'frappy_psi.ccu4.CCU4IO', + 'comm. to CCU4', uri=ccu_uri) + if he: + if not isinstance(he, str): # e.g. True + he = 'He_lev' + Mod(he, cls='frappy_psi.ccu4.HeLevel', + description='the He Level', io=self.ioname) + if n2: + if isinstance(n2, str): + n2 = n2.split(',') + else: # e.g. True + n2 = [] + n2, valve, upper, lower = n2 + ['N2_lev', 'N2_valve', 'N2_upper', 'N2_lower'][len(n2):] + print(n2, valve, upper, lower) + Mod(n2, cls='frappy_psi.ccu4.N2Level', + description='the N2 Level', io=self.ioname, + valve=valve, upper=upper, lower=lower) + Mod(valve, cls='frappy_psi.ccu4.N2FillValve', + description='LN2 fill valve', io=self.ioname) + Mod(upper, cls='frappy_psi.ccu4.N2TempSensor', + description='upper LN2 sensor') + Mod(lower, cls='frappy_psi.ccu4.N2TempSensor', + description='lower LN2 sensor') + + +class HePump: + def __init__(self, Mod, hepump_uri, hepump_io='hepump_io', hemotname='hepump_mot', **kwds): + Mod(hepump_io, 'frappy_psi.trinamic.BytesIO', 'He pump connection', uri=hepump_uri) + Mod(hemotname, 'frappy_psi.trinamic.Motor', 'He pump valve motor', io=hepump_io) + + +class Rack: + rackfile = '/home/l_samenv/.config/racks.ini' + + def __init__(self, modfactory, **kwds): + self.modfactory = modfactory + parser = ConfigParser() + parser.optionxform = str + parser.read([self.rackfile]) + kwds.update(parser.items(os.environ['Instrument'])) + self.kwds = kwds + + def lakeshore(self): + return Lsc(self.modfactory, **self.kwds) + + def ccu(self, **kwds): + kwds.update(self.kwds) + return CCU(self.modfactory, **kwds) + + def hepump(self): + return HePump(self.modfactory, **self.kwds) + diff --git a/frappy_psi/ccu4.py b/frappy_psi/ccu4.py index 4fb4176..5a179f7 100644 --- a/frappy_psi/ccu4.py +++ b/frappy_psi/ccu4.py @@ -28,7 +28,6 @@ from frappy.core import HasIO, Parameter, Command, Readable, Writable, Drivable, Property, StringIO, BUSY, IDLE, WARN, ERROR, DISABLED, Attached from frappy.datatypes import BoolType, EnumType, FloatRange, StructOf, \ StatusType, IntRange, StringType, TupleOf -from frappy.dynamic import Pinata from frappy.errors import CommunicationFailedError from frappy.states import HasStates, status_code, Retry @@ -42,7 +41,7 @@ class CCU4IO(StringIO): # for completeness: (not needed, as it is the default) end_of_line = '\n' # on connect, we send 'cid' and expect a reply starting with 'CCU4' - identification = [('cid', r'CCU4.*')] + identification = [('cid', r'cid=CCU4.*')] class CCU4Base(HasIO): @@ -144,7 +143,7 @@ class Valve(CCU4Base, Writable): self.command(**self._close_command) def read_status(self): - state = self.command(self._query_state) + state = int(self.command(**self._query_state)) self.value, status = self.STATE_MAP[state] return status @@ -174,14 +173,14 @@ class N2TempSensor(Readable): value = Parameter('LN2 T sensor', FloatRange(unit='K'), default=0) -class N2Level(CCU4Base, Pinata, Readable): +class N2Level(CCU4Base, Readable): valve = Attached(Writable, mandatory=False) lower = Attached(Readable, mandatory=False) upper = Attached(Readable, mandatory=False) value = Parameter('vessel state', EnumType(empty=0, ok=1, full=2)) - status = Parameter(datatype=StatusType(Readable, 'BUSY')) - mode = Parameter('auto mode', EnumType(A), readonly=False) + status = Parameter(datatype=StatusType(Readable, 'DISABLED', 'BUSY')) + mode = Parameter('auto mode', EnumType(A), readonly=False, default=A.manual) threshold = Parameter('threshold triggering start/stop filling', FloatRange(unit='K'), readonly=False) @@ -206,15 +205,6 @@ class N2Level(CCU4Base, Pinata, Readable): 5: (WARN, 'empty'), } - def scanModules(self): - for modname, name in self.names.items(): - if name: - sensor_name = name.replace('$', self.name) - self.setProperty(modname, sensor_name) - yield sensor_name, { - 'cls': N2FillValve if modname == 'valve' else N2TempSensor, - 'description': f'LN2 {modname} T sensor'} - def initialReads(self): self.command(nav=1) # tell CCU4 to activate LN2 sensor readings super().initialReads() @@ -280,17 +270,19 @@ class N2Level(CCU4Base, Pinata, Readable): @Command() def fill(self): + """start filling""" self.mode = A.auto - self.io.write(nc=1) + self.command(nc=1) @Command() def stop(self): + """stop filling""" if self.mode == A.auto: # set to watching self.command(nc=3) else: # set to off - self.io.write(nc=0) + self.command(nc=0) class FlowPressure(CCU4Base, Readable): diff --git a/frappy_psi/lakeshore.py b/frappy_psi/lakeshore.py index 105322b..c2c1a59 100644 --- a/frappy_psi/lakeshore.py +++ b/frappy_psi/lakeshore.py @@ -10,313 +10,1233 @@ # details. # # You should have received a copy of the GNU General Public License along with -# this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Oksana Shliakhtun +# Markus Zolliker # ***************************************************************************** -import math +"""driver for various lakeshore temperature monitors/controllers""" -from frappy.core import Readable, Parameter, IntRange, FloatRange, \ - StringIO, HasIO, StringType, Property, Writable, Drivable, IDLE, ERROR, \ - StructOf, WARN, Done, BoolType, Enum -from frappy.errors import RangeError +import time +import math +import random +import threading +import numpy as np +from numpy.testing import assert_approx_equal + +from frappy.core import Module, Readable, Parameter, Property, \ + HasIO, StringIO, Writable, IDLE, ERROR, BUSY, DISABLED, nopoll, Attached +from frappy.datatypes import IntRange, FloatRange, StringType, \ + BoolType, ValueType, StatusType +from frappy.errors import CommunicationFailedError, ConfigError, \ + HardwareError, DisabledError, ImpossibleError, secop_error, SECoPError +from frappy.lib.units import NumberWithUnit, format_with_unit +from frappy.lib import formatStatusBits from frappy_psi.convergence import HasConvergence from frappy.mixins import HasOutputModule, HasControlledBy +from frappy.extparams import StructParam +from frappy_psi.calcurve import CalCurve -class Ls340IO(StringIO): - """communication with 340CT""" - end_of_line = '\r' - wait_before = 0.05 - identification = [('*IDN?', r'LSCI,MODEL340,.*')] +class IO(StringIO): + """IO classes of most LakeShore models will inherit from this""" + end_of_line = '\n' + wait_before = -1 + # ValueType(str): accept and convert also numbers + model = Property('model name', ValueType(str), value='') + + def initModule(self): + if self.wait_before < 0: + # wait before is not needed when connected via LAN + # as the port number is always 7777, the following is a quite + # good guess + self.wait_before = 0 if ':7777' in self.uri else 0.05 + super().initModule() + + def checkHWIdent(self): + self.identification = [('*IDN?', f'LSCI,MODEL{self.model},.*')] + super().checkHWIdent() -class LakeShore(HasIO): - def set_par(self, cmd, *args): - head = ','.join([cmd] + [a if isinstance(a, str) else f'{a:g}' for a in args]) - tail = cmd.replace(' ', '? ') - reply = self.communicate(f'{head};{tail}') - result = [] - for num in reply.split(','): - try: - result.append(float(num)) - except ValueError: - result.append(num) - if len(result) == 1: - return result[0] - return result +class HasLscIO(HasIO): + ioClass = IO - def get_par(self, cmd): - reply = self.communicate(cmd) - result = [] - for num in reply.split(','): - try: - result.append(float(num)) - except ValueError: - result.append(num) - if len(result) == 1: - return result[0] - return result + def query(self, msg, *converters): + """send a query and parse result + + :param msg: the query message, contains a '?' + :param converters: converters for the result values + :return: the converted value(s) + when more than one converter is given, this is a list + """ + response = self.communicate(msg).strip() + if not converters: + return response + values = sum((v.split(',') for v in response.split(';')), start=[]) + if len(values) < len(converters): + raise CommunicationFailedError( + f'expected {len(converters)} values,' + f' got {values} as reply to {msg}') + result = [c(v.strip()) for (c, v) in zip(converters, values)] + return result[0] if len(converters) == 1 else result + + def command(self, msghead, *args): + """send a command to change parameters and query the replies + + when no args are given, the command and a dummy request '*OPC?' + is sent in order to wait for completion. + + :param msghead: the message, including the arguments needed + for the query (e.g. channel, output_no) + :param args: parameters to set. + the converters for the reply are taken from their types + :return: the converted readback value(s) + when more than one argument is given, this is a list + """ + if not args: + self.communicate(f'{msghead};*OPC?') + return None + converters = [type(a) for a in args] + values = [a if issubclass(c, str) else f'{a:g}' + for c, a in zip(converters, args)] + if ' ' in msghead: + query = msghead.replace(' ', '? ', 1) + cmd = ','.join([msghead] + values) + else: + query = msghead + '?' + cmd = f"{msghead} {','.join(values)}" + return self.query(f'{cmd};{query}', *converters) -class Sensor340(LakeShore, Readable): - """A channel of 340TC""" +class Device(HasLscIO, Module): + """common SECoP module for curve handling""" + ioClass = IO + status = Parameter('status for installing calib', StatusType(Readable, 'BUSY')) + # ValueType(str): accept and convert also numbers + model = Property('model name', ValueType(str), value='') + curve_handling = Property('is curve handling enabled', BoolType(), default=False) + curve_cache = None + channels = None # sequence of available channels + user_curves = None # range of user curves (tuple) + log_formats = True, False # tuple (, ) + max_curve_length = 200 + cmds_per_line = 5 # number of commands / replies allowed on one line + _empty_curves = None + _request_lock = None + FAST_POLL = 0.01 - # define the communication class to create the IO module - ioClass = Ls340IO - channel = Property('lakeshore channel', StringType()) - alarm = Parameter('alarm limit', FloatRange(unit='K'), readonly=False) - # define or alter the parameters - # as Readable.value exists already, we give only the modified property 'unit' - value = Parameter(unit='K') + def initModule(self): + self._empty_curves = {} # dict of None (used as ordered set) + self._curve_map = {} # dict CurveHeader.key of curve_no + self._requests = {} # dict of CurveRequest + self._sensors = {} # dict of sensors + self._disable_channels = set(self.channels) + self._request_lock = threading.Lock() + self.io.setProperty('model', self.model) + super().initModule() - def read_value(self): - return self.get_par(f'KRDG? {self.channel}') + def curve_request(self, sensor): + """register a sensor for checking or uploading the calibration curve + + :param sensor: the sensor module + """ + with self._request_lock: + prev_calcurve = getattr(self._sensors.get(sensor.channel), 'calcurve', None) + self._sensors[sensor.channel] = sensor + if prev_calcurve in self._requests: + for ch, sens in self._sensors.items(): + if prev_calcurve == sens.calcurve: + break + else: + self._requests.pop(prev_calcurve, None) + self.log.info('interrupt installing %s', prev_calcurve) + prev_request = self._requests.get(sensor.calcurve) + req = CurveRequest(sensor, self) + if prev_request: + if np.array_equal(prev_request.points, req.points): + # a request is already running and the curve content has not changed + self.log.info('already installing %s', sensor.calcurve) + req.add_sensor(sensor) + else: + self.log.info('file has changed, restart treating %s', sensor.calcurve) + self._requests[sensor.calcurve] = req + else: + self._requests[sensor.calcurve] = req + self._disable_channels.discard(sensor.channel) + self.setFastPoll(True, self.FAST_POLL) + + def doPoll(self): + """this is not really a poller, but a worker task + + as loading and checking curves takes a while, this is done in the + background. does nothing after all curves are checked / loaded + and is reactivated only when curves are changed during runtime + """ + if not self.curve_handling or not self.io.is_connected: + return + if not self.curve_cache: + self.curve_cache = {} + headers = self.get_headers() + for curve_no in range(*self.user_curves): + crvhdr = CurveHeader(*headers[curve_no].split(',')) + self.curve_cache[curve_no] = crvhdr + if crvhdr.key: + if crvhdr in self._curve_map: + # this is a duplicate, add to empty curves + self._empty_curves[curve_no] = None + else: + self._curve_map[crvhdr.key] = curve_no + try: + check_item = None + with self._request_lock: + for key, request in self._requests.items(): + if request.loading: + break + if request.status[0] != ERROR: + check_item = key, request + else: + # no loading action found -> do check action + if check_item: + key, request = check_item + else: + request = None + if request: + request.action = request.action(request) + if request.action is not None: + return + self.finish_curve(self._requests.pop(key)) + # no more requests pending + while self._disable_channels: + ch = self._disable_channels.pop() + sensor = self._sensors.get(ch) + if sensor is None or sensor.disabled: + self.disable_channel(ch) + except Exception as e: + request.loading = False + request.status = ERROR, repr(e) + # self._requests.pop(key) + raise + if not self._requests: + self.setFastPoll(False) def read_status(self): - c = int(self.get_par(f'RDGST? {self.channel}')) - if c >= 128: - return ERROR, 'units overrange' - if c >= 64: - return ERROR, 'units zero' - if c >= 32: - return ERROR, 'temperature overrange' - if c >= 16: - return ERROR, 'temperature underrange' - # do not check for old reading -> this happens regularly on NTCs with T comp - if c % 2: - return ERROR, 'invalid reading' - # ask for high alarm status and return warning - if 1 in self.get_par(f'ALARMST? {self.channel}'): - return WARN, 'alarm triggered' + status = IDLE, '' + for req in self._requests.values(): + if req.loading: + return req.status + status = req.status + return status + + def get_empty(self): + """get curve no of a curve to be reused""" + if self._empty_curves: + # we have unused curve slots + curve_no = next(iter(self._empty_curves)) + self._empty_curves.pop(curve_no) + else: + used_no = set(s.curve_no for s in self._sensors.values()) + for req in self._requests.values(): + used_no.add(req.invalidate) + n0, n = self.user_curves + n -= n0 + # avoid to take the lower numbers first + # as then the most recent curves would be overridden + offset = random.randrange(n) + for i in range(n): + curve_no = n0 + (i + offset) % n + if curve_no not in used_no: + break + else: + raise ValueError('no empty curves available') + return curve_no + + def get_calib_state(self, sensor): + if not self.io.is_connected: + return ERROR, 'no connection' + req = self._requests.get(sensor.calcurve) + if req: + if req.loading: + return ImpossibleError(f'can not read T while {req.status[1]}') + if req.status[0] >= ERROR: + return ImpossibleError('error while loading calibration curve') + return None + + def verify_ends(self, request): + """preliminary check: check if the ends of the curve are matching the stored ones""" + npnt = len(request.points) + numbers = [1, npnt] + points = [request.points[0], request.points[npnt-1]] + if npnt < self.max_curve_length: + numbers.append(npnt + 1) + points.append((0, 0)) + for pairs in zip(points, self.get_crvpts(request.curve_no, *numbers)): + if not self.is_equal(*pairs): + return False + return True + + def find_curve(self, request): + """try to find curve and return required action + + :param request: the curve request + :return: next action + """ + no = self._curve_map.get(request.crvhdr.key) + if no: + request.crvhdr = self.curve_cache[no] + request.set_curve_no(no) + if self.verify_ends(request): + self.log.info('calcurve #%d %s found, start to check consistency', no, request.crvhdr) + # guess the curve is o.k. -> install + request.install_sensors(request.sensors.values()) + request.pointer = 1 + request.loading = False + request.status = IDLE, 'checking calcurve' + for sensor in request.sensors.values(): + sensor.pollInfo.trigger() + return self.check_points + request.invalidate = no + request.set_curve_no(self.get_empty()) + request.install_sensors(request.sensors.values()) + self.log.info('%s found, but content has changed, create #%d', + request.crvhdr.sn, request.curve_no) + return self.start_load + request.set_curve_no(self.get_empty()) + for sensor in request.sensors.values(): + sensor.install_sensor() + self.log.info('load curve #%d %s', request.curve_no, request.crvhdr) + return self.start_load + + def check_points(self, request): + """check next point + + :param request: the curve request + :return: next action + """ + if request.pointer >= len(request.points): + return None # finish + if request.new_sensors: + # install sensors for the same curve added in the meantime + sensors = request.new_sensors + request.new_sensors = set() + request.install_sensors(sensors) + given = request.points[request.pointer:request.pointer + self.cmds_per_line] + first = request.pointer + 1 + no = request.curve_no + for n, (pt, ptg) in enumerate(zip(self.get_crvpts(no, *range(first, first + len(given))), given)): + if not self.is_equal(pt, ptg): + self.log.info('reply (%g, %g) does not match given point %d (%g,%g)', + *pt, first + n, *ptg) + request.invalidate = no + request.set_curve_no(self.get_empty()) + self.log.info('%s has changed, create #%d', request.crvhdr.sn, request.curve_no) + request.loading = True + return self.start_load + + request.pointer += self.cmds_per_line + return self.check_points + + def start_load(self, request): + """start loading a curve + + :param request: the curve request + :return: next action + """ + request.status = BUSY, 'loading calcurve' + # delete first to make sure there is nothing left after the loaded points + self.command(f'CRVDEL {request.curve_no}') + # write a temporary header + # (in case loading will not finish, the curve should be marked as empty) + header = list(request.crvhdr) + self.put_header(request.curve_no, 'loading...', '', *header[2:]) + request.pointer = 0 + return self.load_points + + def load_points(self, request): + """load the next point(s) + + :param request: the curve request + :return: next action + """ + try: + given = request.points[request.pointer:request.pointer+self.cmds_per_line] + first = request.pointer + 1 + cmds = [f'CRVPT {request.curve_no},{first + n},{x:g},{y:g};' + f'CRVPT?{request.curve_no},{first + n}' for n, (x, y) in enumerate(given)] + for n, (reply, pt) in enumerate(zip(self.communicate(';'.join(cmds)).split(';'), given)): + if not self.is_equal([float(r) for r in reply.split(',')], pt): + # self.log.error('reply %r does not match given point %d (%g,%g)' + # '-> please check for values outside allowed range', reply, first + n, *pt) + raise HardwareError(f'reply {reply.strip()} does not match given point ' + f'{first + n} {pt[0]:g},{pt[1]:g}' + '-> please check for values outside allowed range') + request.pointer += self.cmds_per_line + return None if request.pointer >= len(request.points) else self.load_points + except Exception as e: + self.log.exception('error in load_points %s', e) + raise + + def finish_curve(self, request): + """write header and assign to channel(s) + + :param request: the curve request + :return: next action + """ + self.curve_cache[request.curve_no] = CurveHeader(*request.crvhdr) + self._curve_map[request.crvhdr.key] = request.curve_no + self.put_header(request.curve_no, *request.crvhdr) + request.install_sensors(request.sensors.values()) + no = request.invalidate + comment = 'finished' if request.loading else 'verified' + if no: + self.log.info('calcurve #%d %s %s, clear previous #%d', + request.curve_no, request.crvhdr, comment, no) + self.curve_cache[no] = CurveHeader() + self._empty_curves[no] = None + else: + self.log.info('calcurve #%d %s %s', + request.curve_no, request.crvhdr, comment) + request.status = None + for sensor in request.sensors.values(): + try: + sensor.pollInfo.trigger() + except Exception: + pass + + def put_header(self, curve_no, name, sn, fmt, limit, coef): + """write header""" + self.communicate(f'CRVHDR {curve_no},"{name}","{sn}",{fmt},{limit},{coef};*OPC?') + + def is_equal(self, left, right, fixeps=(1.1e-5, 1.1e-5), significant=6): + """check whether a returned calibration point is equal within curve point precision""" + for v1, v2, eps in zip(left, right, fixeps): + try: + assert_approx_equal(v1, v2, significant, verbose=False) + except AssertionError: + return abs(v1 - v2) < eps + return True + + def get_crvpts(self, curve_no, *numbers): + """read curve points + + :param curve_no: curve number + :param numbers: indices of points to retrieve + :return: list of points (x,y) + """ + replies = [] + for i in range(0, len(numbers), self.cmds_per_line): + cmds = [f'CRVPT?{curve_no},{n}' for n in numbers[i:i+self.cmds_per_line]] + replies.extend(self.communicate(';'.join(cmds)).split(';')) + return [[float(v) for v in xy.split(',')] for xy in replies] + + def get_headers(self): + """get all headers + + for performance reasons, by default 5 headers are read in one line + this is speeing up quite a lot on serial connections + """ + n0, n1 = self.user_curves + result = {} + for ni in range(n0, n1, self.cmds_per_line): + cmd = ';'.join(f'CRVHDR?{n}' for n in range(ni, min(n1, ni + self.cmds_per_line))) + for i, reply in enumerate(self.communicate(cmd).split(';')): + result[ni + i] = reply + return result + + def disable_channel(self, channel): + self.command(f'INTYPE {channel},0') + + +class CurveRequest: + invalidate = None + loading = True + status = BUSY, 'looking up calcurve' + + def __init__(self, sensor, device): + self.action = device.find_curve + self.new_sensors = set() + self.sensors = {sensor.channel: sensor} + calcurve = CalCurve(sensor.calcurve) + equipment_id = device.propertyValues.get('original_id') or device.secNode.equipment_id + name = f"{equipment_id.split('.')[0]}.{sensor.name}" + sn = calcurve.calibname + limit = calcurve.calibrange[1] + unit = calcurve.options.get('unit', 'Ohm') + logformat, range_limit = sensor.get_curve_type(calcurve) + self.points = calcurve.export(logformat, device.max_curve_length) + if range_limit: + x0, x1 = range_limit + if x0 > calcurve.xrange[0] or calcurve.xrange[1] > x0: + # we do not limit in the first place already, because of the following warning + if calcurve.xrange[1] > x1: + device.log.warn('curve %s cut at %g %s (upper end was %g %s)', + sn, x1, unit, calcurve.xrange[1], unit) + if x0 > calcurve.xrange[0]: + device.log.warn('curve %s cut at %g %s (lower end was %g %s)', + sn, x0, unit, calcurve.xrange[0], unit) + self.points = calcurve.export(logformat, device.max_curve_length, xlimits=range_limit) + ymax = max(self.points[0, 1], self.points[-1, 1]) + limit = min(limit, 10 ** ymax if calcurve.logformat[1] else ymax) + device.log.info('exported %d points from %s', len(self.points), calcurve.filename) + if unit == 'Ohm': + if calcurve.logformat[1]: + fmt = 5 + elif calcurve.logformat[0]: + fmt = 4 + else: + fmt = 3 + else: + fmt = 2 if unit == 'V' else 1 + coef = 1 + (self.points[0][1] < self.points[-1][1]) # 1: ntc, 2: ptc + self.crvhdr = CurveHeader(name, sn, fmt, limit, coef) + + def set_curve_no(self, curve_no): + self.curve_no = curve_no + for s in self.sensors.values(): + s.curve_no = curve_no + + def add_sensor(self, sensor): + self.sensors[sensor.channel] = sensor + self.new_sensors.add(sensor) + + def install_sensors(self, sensors=None): + for sensor in sensors: + sensor.install_sensor() + sensor.install_curve() + + +class CurveHeader(tuple): + """curve header class + + carries metadata of curves + """ + def __new__(cls, name='', sn='', fmt=0, limit=0, coef=0): + """initialize curve header. convert from string if needed + + :param name: a name + :param sn: the serial 'number' - is a string as it contains letters + :param fmt: 1..5 (mV,K), (V,K), (Ohm,K), (log Ohm, K), (log Ohm, log K) + :param limit: setpoint limit (K) + :param coef: 1, 2 (negative, positive) + """ + return tuple.__new__(cls, (name.strip(), sn.strip(), int(fmt), float(limit), int(coef))) + + @property + def name(self): + return self[0] + + @property + def sn(self): + return self[1] + + @property + def fmt(self): + return self[2] + + @property + def limit(self): + return self[3] + + @property + def coef(self): + return self[4] + + @property + def key(self): + """key for curve lookup""" + if self.sn: + return self.sn.lower(), self.fmt, self.coef + return None + + +def get_cfg_value(cfgdict, key, default=object): + # get value from a cfgdict + param = cfgdict.get(key) + try: + return param['value'] + except TypeError: + # param is not a dict + return param + except KeyError: + if type(param).__name__ != 'Param': + return param + if default is object: + raise KeyError(f'cfg dict does not contain {key!r}') + return default + + +class Base(HasLscIO): + model = None + device = Attached(Device, mandatory=False) # device is not needed without curve handling + + def initModule(self): + if self.device and not self.io: + self.io = self.device.io.name + super().initModule() + + +class Sensor(Base, Readable): + """base class for sensors""" + value = Parameter(unit='K') + status = Parameter(datatype=StatusType(Readable, 'DISABLED', 'BUSY')) + raw = Parameter('raw sensor value', datatype=FloatRange(unit='Ohm'), default=0) + channel = Property('used channel', StringType()) # input + calcurve = Parameter('calibration curve name', StringType(), readonly=False) + enabled = Parameter('enable flag', BoolType(), readonly=False) + classes = {} # dict of class + curve_no = None + STATUS_BIT_LABELS = 'invalid_reading old_reading b2 b3 t_under t_over units_zero units_overrange'.split() + _read_error = None + _raw_error = None + _do_read = True + _curve_handling = False # True when a device is present and device.curve_handling is True + # TODO: implement alarms + + def initModule(self): + super().initModule() + if self.device: + self._curve_handling = self.device.curve_handling + if not self._curve_handling: + self.parameters['calcurve'].setProperty('export', False) + + def doPoll(self): + self.read_status() # polls also value and raw + + @nopoll + def read_value(self): + self.status, value, self.raw = self.get_data() + if isinstance(value, SECoPError): + raise value.copy() + return value + + @nopoll + def read_raw(self): + self.status, self.value, raw = self.get_data() + if isinstance(raw, SECoPError): + raise raw.copy() + return raw + + def read_status(self): + try: + status, self.value, self.raw = self.get_data() + except Exception as e: + self.raw = self.value = secop_error(e) + raise + return status + + def get_data(self): + """get reading status, raw and Kelvin value with minimal delay""" + status = IDLE, '' + if self.enabled: + ch = self.channel + rdgst, raw, value = self.query(f'RDGST?{ch};SRDG?{ch};KRDG?{ch}', int, float, float) + rdgst &= 0xfd # suppress old reading + if rdgst: + statuslist = formatStatusBits(rdgst, self.STATUS_BIT_LABELS) + status = ERROR, statuslist[-1] # show only the most fatal error + value = HardwareError(statuslist[-1]) + while statuslist and statuslist[-1].startswith('t_'): + statuslist.pop() + if statuslist: + raw = HardwareError(statuslist[-1]) + elif self._curve_handling: + value_error = self.device.get_calib_state(self) + if value_error: + value = value_error + status = ERROR, str(value_error) + else: + raw = value = DisabledError('disabled') + status = DISABLED, 'disabled' + self.enable = False + return status, value, raw + + def write_calcurve(self, calibname): + if not self._curve_handling: + raise ConfigError('curve handling is off') + self.enabled = True + self.device.curve_request(self) + + def write_enabled(self, value): + if self._curve_handling: + if value: + self.write_calcurve(self.calcurve) + else: + self.device.disable_channel(self.channel) + + def read_enabled(self): + if not self.query(f'INTYPE?{self.channel}', int): + return False + return self.enabled + + def get_curve_type(self, calcurve): + unit = calcurve.options.get('unit', 'Ohm') + logformat = False + range_limit = (1e-5, 9999.99) + if unit == 'Ohm': + if calcurve.ptc: + # ,,,, + self.intype = 2, 1, 6, 1, 1 # PTC + else: + range_limit = (1e-5, 99999.9) + logformat = True, False + self.intype = 3, 1, 8, 1, 1 # NTC + elif unit == 'V': # diode + self.intype = (1, 1, 0, 0, 1) if calcurve.xscale[1] <= 2.5 else (1, 1, 1, 0, 1) + else: # thermocouple + self.intype = (4, 0, 0, 1, 1) + return logformat, range_limit + + def install_sensor(self): + if self.query(f'INTYPE?{self.channel}', *(type(v) for v in self.intype)) != self.intype: + self.command(f'INTYPE {self.channel}', *self.intype) + + def install_curve(self): + if self.query(f'INCRV?{self.channel}', int) != self.curve_no: + self.command(f'INCRV {self.channel}', self.curve_no) + + +class Output(Base, HasControlledBy, Writable): + """base class for heater output""" + classes = {} # dict of dict of class + value = Parameter('heater output', FloatRange(0, 100, unit='W')) + target = Parameter('manual heater output', FloatRange(0, 100, unit='W'), default=0) + htr = Parameter('heater output current percentage', FloatRange(0, 100, unit='%')) + max_heater = Parameter('desired max. heater with unit W, A or V', StringType(), readonly=False) + max_power = Parameter('''max heater power\n + value will be clamped to allowed range and may be rounded to discrete values + ''', FloatRange(0, 100, unit='W'), readonly=False) + # for the case when several outputs are possible: + output_no = Parameter('lakeshore output or loop number', IntRange(1, 4), default=1) + resistance = Parameter('heater resistance', FloatRange(10, 100), readonly=False, default=25) + Extension = None + AMP_VOLT_WATT = NumberWithUnit('A', 'V', 'W') + imax = None + vmax = None + n_ranges = None + n_currents = None + _power_scale = 1e-4 + # sorted_factors: list of possible current squares (I^2, idx) + # (multiply I^2 with resistance to get power) + sorted_factors = None + errorstatus = None + _desired_max_power = None + _control_loop = None # a loop module object when controlled, None when in manual mode + power_offset = 0 # offset for closed_loop + + def configure(self): + """configure the output + + based on self._control_loop, self.resistance and self._desired_max_power + """ + raise NotImplementedError + + def set_closed_loop(self, loop): + """set to control mode + + :param loop: the temperature loop module + """ + if self._control_loop != loop: + self.log.info(f'set output to be controlled on channel {loop.channel}') + self._control_loop = loop + self.configure() + else: + self.fix_heater_range() + + def set_open_loop(self): + """set to open loop""" + if self._control_loop is not None: + self.log.info('put output into manual mode') + self._control_loop = None + self.configure() + else: + self.fix_heater_range() + + def fix_heater_range(self): + """switch on heater range, if off""" + + def write_output_no(self, output_no): + if self._desired_max_power is not None: + self.output_no = output_no # early update needed by configure() + self.configure() + + def write_resistance(self, resistance): + if self._desired_max_power is not None: + self.resistance = resistance # early update needed by configure() + self.configure() + + def get_best_power_idx(self, max_power, round_down_limit=1.0001): + """get best index from sorted_factors to match given max_current""" + prev_pwr = 0 + prev_idx = None + for fact, idx in self.sorted_factors: + pwr = min(fact * self.resistance, self.vmax ** 2 / self.resistance) + if pwr >= max_power: + if pwr == prev_pwr: + # happens when we reached voltage compliance limit + return prev_idx + if max_power - prev_pwr < pwr - max_power and max_power < prev_pwr * round_down_limit: + # previous value is closer and not above round down limit + return prev_idx + return idx + prev_idx = idx + return prev_idx + + def calc_power(self, percent): + # power is limited by voltage compliance + return min(percent ** 2 * self._power_scale * self.resistance, + self.vmax ** 2 / self.resistance) + + def calc_percent(self, power): + return min(100., math.sqrt(power / self.resistance / self._power_scale)) + + def read_status(self): + """get heater status""" return IDLE, '' - def write_alarm(self, alarm): - return self.set_par(f'ALARM {self.channel}', 1, 1, alarm, 0, 0, 2)[2] + def put_manual_power(self, value): + self.command(f'MOUT {self.output_no}', value) - def read_alarm(self): - return self.get_par(f'ALARM? {self.channel}')[2] + def read_value(self): + return self.calc_power(self.read_htr()) - -class HeaterOutput(LakeShore, HasControlledBy, HasIO, Writable): - max_power = Parameter('max heater power', datatype=FloatRange(0, 100), unit='W', readonly=False) - value = Parameter('heater output', datatype=FloatRange(0, 100), unit='W') - target = Parameter('manual heater output', datatype=FloatRange(0, 100), unit='W') - loop = Property('lakeshore loop', IntRange(1, 2), default=1) # output - channel = Property('attached channel', StringType()) # input - resistance = Property('heater resistance', datatype=FloatRange(10, 100)) - _range = 0 - _max_power = 50 - - MAXCURRENTS = {1: 0.25, 2: 0.5, 3: 1.0, 4: 2.0} - RANGES = {1: 1e4, 2: 1e3, 3: 1e2, 4: 1e1, 5: 1} - - SETPOINTLIMS = 999 - - STATUS_MAP = { - 0: (IDLE, ''), - 1: (ERROR, 'Power supply over voltage'), - 2: (ERROR, 'Power supply under voltage'), - 3: (ERROR, 'Output digital-to-analog Converter error'), - 4: (ERROR, 'Current limit digital-to-analog converter error'), - 5: (ERROR, 'Open heater load'), - 6: (ERROR, 'Heater load less than 10 ohms') - } - - def earlyInit(self): - super().earlyInit() - self.CHOICES = sorted([(maxcurrent ** 2 * factor, icurrent, irange) - for irange, factor in self.RANGES.items() - for icurrent, maxcurrent in self.MAXCURRENTS.items()]) + def read_htr(self): + """read heater percent""" + raise NotImplementedError def write_max_power(self, max_power): - prev = 0 - for i, (factor, icurrent, irange) in enumerate(self.CHOICES): - power = min(factor * self.resistance, 2500 / self.resistance) - if power >= max_power: - if prev >= max_power * 0.9 or prev == power: - icurrent, irange = self.CHOICES[i - 1][1:3] - break - prev = power - self._range = irange - self.set_par(f'CLIMIT {self.loop}', self.SETPOINTLIMS, 0, 0, icurrent, irange) - self.set_par(f'RANGE {irange}') - self.set_par(f'CDISP {self.loop}', 1, self.resistance, 1, 0) + self._desired_max_power = max_power + self.configure() + return self.calc_power(100) - def read_max_power(self): - setplimit, _, _, icurrent, irange = self.get_par(f'CLIMIT? {self.loop}') - # max_power from codes disregarding voltage limit: - self._max_power = self.MAXCURRENTS[icurrent] ** 2 * self.RANGES[irange] * self.resistance - # voltage limit = 50V: - max_power = min(self._max_power, 2500 / self.resistance) - return max_power - - def set_range(self): - self.set_par('RANGE ', self._range) - - def percent_to_power(self, percent): - return min((percent / 100) ** 2 * self._max_power, - 2500 / self.resistance) - - def power_to_percent(self, power): - return (power / self._max_power) ** (1 / 2) * 100 # limit - - def read_status(self): - return self.STATUS_MAP[self.get_par(f'HTRST?')] + def write_max_heater(self, value): + number, unit = self.AMP_VOLT_WATT.parse(value) + pmax = self['target'].datatype.max + if unit == 'A': + power = self.write_max_power(min(pmax, number ** 2 * self.resistance)) + number = math.sqrt(power / self.resistance) + elif unit == 'V': + power = self.write_max_power(min(pmax, number ** 2 / self.resistance)) + number = math.sqrt(power * self.resistance) + else: + number = self.write_max_power(min(pmax, number)) + return format_with_unit(number, unit, 3) def write_target(self, target): self.self_controlled() - self.write_max_power(self.max_power) - self.set_heater_mode(3) # 3=open loop - self.set_range() - percent = self.power_to_percent(target) - reply = self.set_par(f'MOUT {self.loop}', percent) - return self.percent_to_power(reply) - - def set_heater_mode(self, mode): - self.set_par(f'CSET {self.loop}', self.channel, 1, 1, 0) - self.set_par(f'CMODE {self.loop}', int(mode)) - return self.get_par(f'RANGE?') - - def read_value(self): - return self.percent_to_power(self.get_par(f'HTR?{self.loop}')) + self.put_manual_power(target) -class HeaterOutput340(HeaterOutput): - resistance = Property('heater resistance', datatype=FloatRange(10, 100)) +class MainOutput(Output): + """power output with adjustable power - MAXCURRENTS = {1: 0.25, 2: 0.5, 3: 1.0, 4: 2.0} - RANGES = {1: 1e4, 2: 1e3, 3: 1e2, 4: 1e1, 5: 1} + including common code for main output(s) on 336 / 350 + """ - STATUS_MAP = { - 0: (IDLE, ''), - 1: (ERROR, 'Power supply over voltage'), - 2: (ERROR, 'Power supply under voltage'), - 3: (ERROR, 'Output digital-to-analog Converter error'), - 4: (ERROR, 'Current limit digital-to-analog converter error'), - 5: (ERROR, 'Open heater load'), - 6: (ERROR, 'Heater load less than 10 ohms') - } - - def read_value(self): - return self.percent_to_power(self.get_par(f'HTR?')) # no loop to be given on 340 - - -class HeaterOutput336(HeaterOutput): - - power = 20 - - STATUS_MAP = { + HTRST_MAP = { 0: (IDLE, ''), 1: (ERROR, 'Open heater load'), 2: (ERROR, 'Heater short') } - def write_max_power(self, max_power): - max_current = min(math.sqrt(self.power / self.resistance), 2500 / self.resistance) - if self.loop == 1: - max_current_limit = 2 - else: - max_current_limit = 1.414 - if max_current > max_current_limit: - raise RangeError('max_power above limit') - if max_current >= max_current_limit / math.sqrt(10): - self._range = 3 - user_current = max_current - elif max_current >= max_current_limit / 10: - self._range = 2 - user_current = max_current * math.sqrt(10) - else: - self._range = 1 - user_current = max_current * math.sqrt(100) - self.set_par(f'HTRSET {self.loop}', 1 if self.resistance < 50 else 2, 0, user_current, 1) - max_power = max_current ** 2 * self.resistance - self._max_power = max_power - self.set_range() - return max_power - - -class TemperatureLoop340(HasConvergence, HasOutputModule, Sensor340, Drivable, LakeShore): - Status = Enum( - Drivable.Status, - RAMPING=370, - STABILIZING=380, - ) - - target = Parameter(unit='K') - ctrlpars = Parameter('PID parameters', - StructOf(p=FloatRange(0, 1000), i=FloatRange(0, 1000), d=FloatRange(0, 1000)), - readonly=False) - loop = Property('lakeshore loop', IntRange(1, 2), default=1) - ramp = Parameter('ramp rate', FloatRange(min=0, max=100), unit='K/min', readonly=False) - ramp_used = Parameter('whether ramp is used or not', BoolType(), readonly=False) - setpoint = Parameter('setpoint', datatype=FloatRange, unit='K') - - def write_target(self, target): - out = self.output_module - if not self.control_active: - if self.ramp_used: - self.set_par(f'RAMP {self.loop}', 0, self.ramp) - self.set_par(f'SETP {self.loop}', self.value) - self.set_par(f'RAMP {self.loop}', 1, self.ramp) - out.write_target(0) - out.write_max_power(out.max_power) - out.set_heater_mode(1) # closed loop - self.activate_output() - self.start_state() # start the convergence check - out.set_range() - self.set_par(f'SETP {self.loop}', target) - return target - - def read_setpoint(self): - setpoint = self.get_par(f'SETP?{self.loop}') - status = self.get_par(f'RAMPST? {self.loop}') - if status == 0: - self.target = setpoint - return setpoint - - def write_ctrlpars(self, ctrlpars): - p, i, d = self.set_par(f'PID {self.loop}', ctrlpars['p'], ctrlpars['i'], ctrlpars['d']) - return {'p': p, 'i': i, 'd': d} - - def read_ctrlpars(self): - p, i, d = self.get_par(f'PID? {self.loop}') - return {'p': p, 'i': i, 'd': d} - - def read_ramp(self): - self.ramp_used, rate = self.get_par(f'RAMP? {self.loop}') - return rate - - def write_ramp(self, ramp): - self.ramp_used = True - ramp = self.set_par(f'RAMP {self.loop}', self.ramp_used, ramp)[1] - # if self.control: - # self.ramp = ramp - # self.write_target(self.target) - # return Done - return ramp - - def write_ramp_used(self, ramp_used): - ramp_used = self.set_par(f'RAMP {self.loop}', ramp_used, self.ramp)[0] - if self.ramp_used and not ramp_used: - self.write_target(self.target) - return ramp_used + _htr_range = 1 + heater_ranges = {5 - i: 10 ** - i for i in range(5)} + sorted_factors = sorted((v, i) for i, v in heater_ranges.items()) def read_status(self): - statuscode, statustext = self.status - if self.ramp_used: - if self.read_setpoint() == self.target: - statuscode = self.Status.STABILIZING + st = self.query(f'HTRST? {self.output_no}', int) + return self.HTRST_MAP[st] + + def configure(self): + self._htr_range = irng = self.get_best_power_idx(self._desired_max_power) + user_current = max(0.1, min(self.imax, 2 * math.sqrt(self._desired_max_power / + self.heater_ranges[irng] / self.resistance))) + self._power_scale = user_current ** 2 * self.heater_ranges[irng] / 1e4 + self.command(f'HTRSET {self.output_no}', 1 if self.resistance < 50 else 2, 0, user_current, 1) + # self.command(f'CDISP {self.output_no}', 1, self.resistance, 1, 0) + if self._control_loop is None: + mode = self.query(f'OUTMODE?{self.output_no}', int) + if mode != 3: # open loop + self.command(f'OUTMODE {self.output_no}', 3) # control off + # self.command(f'OUTMODE {self.output_no}', 3, self.channel, 0) # control off + self.command(f'RANGE {self.output_no}', self._htr_range) + self.put_manual_power(self.target) + else: + self.command(f'OUTMODE {self.output_no}', 1, self._control_loop.channel, 0) # control on + self.command(f'RANGE {self.output_no}', self._htr_range) + self.put_manual_power(self._control_loop.power_offset) + + def read_max_power(self): + curidx, self._user_current = self.query(f'HTRSET? {self.output_no}', int, int, float)[1:3] + if not self._user_current: + self._user_current = 2.0 ** (curidx * 0.5 - 1) + self._htr_range = self.query(f'RANGE?{self.output_no}', int) or self._htr_range + self._power_scale = self.imax ** 2 * self.heater_ranges[self._htr_range] / 1e4 + return self.calc_power(100) + + def fix_heater_range(self): + # switch heater range on, if needed + irng = self.query(f'RANGE?{self.output_no}', int) + if irng != self._htr_range: + if irng: + self.log.info('output range was changed manually') + self._htr_range = irng else: - statuscode = self.Status.RAMPING - statustext = 'ramping' - if statuscode != ERROR: - return Done - if self.convergence_state.is_active: - self.stop_machine((statuscode, statustext)) - return ERROR, statustext + self.log.info('output was off - switch on again') + self.command(f'RANGE {self.output_no}', self._htr_range) + + def read_htr(self): + return self.query(f'HTR? {self.output_no}', float) +class AnalogOutput(Output): + """low power (max. 1W) output""" + output_no = Parameter(datatype=IntRange(3, 4), default=3) + resistance = Parameter(default=100) + max_power = Parameter(readonly=True) + max_heater = Parameter(readonly=True) + vmax = 10 + imax = 0.1 + def read_max_power(self): + # max power can not be changed + # -> the return value depends on the resistance only + self._power_scale = self.vmax ** 2 / 1e4 + return self.calc_power(100) + + def write_max_power(self, value): + # max power is not configurable + return self.read_max_power() + + def write_output_no(self, output_no): + self.output_no = output_no + self.configure() + + def calc_power(self, percent): + # power is limited by max current + return min(percent ** 2 * self._power_scale / self.resistance, + self.imax ** 2 / self.resistance) + + def configure(self): + if self._control_loop is None: + # 3: open loop, 0: powerup enable off + self.command(f'OUTMODE {self.output_no}', 3, 0, 0) + self.put_manual_power(self.target) + else: + # 1: closed loop, 0: powerup enable off + self.command(f'OUTMODE {self.output_no}', 1, self._control_loop.channel, 0) + self.put_manual_power(self._control_loop.power_offset) + + def read_htr(self): + return self.query(f'AOUT?{self.output_no}', float) + + +class Loop(HasConvergence, HasOutputModule, Sensor): + output_module = Attached(Output) + # this creates individual parameters pid_p, pid_i and pid_d automatically + ctrlpars = StructParam('ctrlpars struct', { + 'p': Parameter('control parameter p', FloatRange(0, 1000)), + 'i': Parameter('control parameter i', FloatRange(0, 1000)), + 'd': Parameter('control parameter d', FloatRange(0, 1000)), + }, prefix='pid_', readonly=False) + power_offset = Parameter('power offset\n\n(using LakeShores Manual Output)', + FloatRange(0, 100, unit='W'), readonly=False, default=0) + classes = {} + + def __init_subclass__(cls): + super().__init_subclass__() + if cls.model: # do not register abstract base classes + cls.model = str(cls.model) + cls.classes[cls.model] = cls + + def write_ctrlpars(self, pid): + pid = self.command(f'PID {self.output_module.output_no}', *[pid[k] for k in 'pid']) + return dict(zip('pid', pid)) + + def read_ctrlpars(self): + pid = self.query(f'PID?{self.output_module.output_no}', float, float, float) + return dict(zip('pid', pid)) + + def write_target(self, value): + sensor_status = Sensor.read_status(self) + if sensor_status[0] >= BUSY: + raise ImpossibleError(f'can not control while status is {sensor_status}') + self.activate_control() + self.output_module.set_closed_loop(self) + self.set_target(value) + + def write_power_offset(self, value): + if self.control_active: + self.output_module.put_manual_power(value) + + def read_target(self): + return self.get_target() + + def set_target(self, value): + return self.command(f'SETP {self.output_module.output_no}', float(value)) + + def get_target(self): + return self.query(f'SETP?{self.output_module.output_no}', float) + + +# --- MODEL 340 --- + +class IO340(IO): + timeout = 5 # needed for INCRV command + model = 340 + end_of_line = '\r' # default at SINQ. TODO: remove + + +class Device340(Device): + ioClass = IO340 + model = 340 + channels = 'ABCD' + user_curves = (21, 61) # the last curve is 60 + log_formats = True, True # log Ohm / log K is supported + _crvsav_deadline = None + + def disable_channel(self, channel): + self.communicate(f'INSET {channel},0;*OPC?') + + def finish_curve(self, request): + super().finish_curve(request) + if request.loading and not self._crvsav_deadline: + # when loading a new curve, remember for sending CRVSAV later + self._crvsav_deadline = time.time() + 600 + + def put_header(self, curve_no, name, sn, fmt, limit, coef): + self.communicate(f'CRVHDR {curve_no},{name:15s},{sn:10s},{fmt},{limit},{coef};*OPC?') + + def doPoll(self): + super().doPoll() + if self._crvsav_deadline: + # prevent flashing multiple times within short time + if time.time() > self._crvsav_deadline: + self._crvsav_deadline = None + self.communicate('CRVSAV;*OPC?') + + def is_equal(self, left, right, fixeps=(1.1e-5, 1.1e-4), significant=6): + # for whatever reason, the number of digits after decimal point for the T column is only 4 + return super().is_equal(left, right, fixeps, significant) + + +class Sensor340(Sensor): + model = 340 + intype = None # arguments for the intype command + + def get_curve_type(self, calcurve): + unit = calcurve.options.get('unit', 'Ohm') + logformat = False + range_limit = None + if unit == 'Ohm': + if calcurve.ptc: + xlim = calcurve.xrange[1] * 0.001 # 1 mA excitation + for rng, limit in enumerate( + [0, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, + 0.1, 0.25, 0.5, 1, 2.5, 5]): + if xlim <= limit: + break + else: + rng = 13 + # we use special type here, in order to allow thermal compensation + # , , , , + self.intype = 0, 2, 2, 10, rng + else: + if calcurve.options.get('type') == 'GE': + self.intype = (10,) + else: + self.intype = (8,) # carbon and ruox are equivalent to cernox(8) + logformat = True, True + elif unit == 'V': # diode + self.intype = (1,) if calcurve.xscale[1] < 2.5 else (2,) + else: # thermocouple + self.intype = (12,) + return logformat, range_limit + + def install_sensor(self): + super().install_sensor() + if self.query(f'INSET?{self.channel}', int, int) != (1, 1): + self.command(f'INSET {self.channel}', 1, 1) + + +class Loop340(Loop, Sensor340): + pass + + +class MainOutput340(MainOutput): + model = 340 + output_no = Parameter(datatype=IntRange(1, 1)) + HTRST_MAP = { + 0: (IDLE, ''), + 1: (ERROR, 'Power supply over voltage'), + 2: (ERROR, 'Power supply under voltage'), + 3: (ERROR, 'Output digital-to-analog Converter error'), + 4: (ERROR, 'Current limit digital-to-analog converter error'), + 5: (ERROR, 'Open heater load'), + 6: (ERROR, 'Heater load less than 10 ohms') + } + _manual_output = 0.0 # TODO: check how to set this + vmax = 50 + imax = 2 + max_currents = {4 - i: 2 ** (1 - i) for i in range(4)} + SETPOINTLIMS = 1500.0 + sorted_factors = sorted([(fhtr * fcur ** 2, (i, h)) + for i, fcur in max_currents.items() + for h, fhtr in MainOutput.heater_ranges.items() + ]) + + def get_status(self): + st = self.query(f'HTRST?', int) + return self.HTRST_MAP[st] + + def configure(self): + icurrent, htr_range = self.get_best_power_idx(self._desired_max_power, 1.1) + self._power_scale = self.max_currents[icurrent] ** 2 * self.heater_ranges[htr_range] / 1e4 + self.command(f'CLIMIT {self.output_no}', self.SETPOINTLIMS, 0.0, 0.0, icurrent, htr_range) + self._htr_range = htr_range + if self._control_loop is None: + mode = self.query(f'CMODE?{self.output_no}', int) + if mode != 3: # open loop + self.command(f'CSET {self.output_no}', '0', 1, 0, 0) # control off + self.command(f'CMODE {self.output_no}', 3) # open loop + self.command('RANGE', self._htr_range) + self.put_manual_power(self._manual_output) + else: + self.command(f'CSET {self.output_no}', self._control_loop.channel, 1, 1, 0) # control on + self.command(f'CMODE {self.output_no}', 1) # pid + self.command('RANGE', self._htr_range) + self.put_manual_power(self._control_loop.power_offset) + self.command(f'CDISP {self.output_no}', 1, self.resistance, 1, 0) + + def fix_heater_range(self): + # switch heater range on, if needed + irng = self.query('RANGE?', int) + if irng != self._htr_range: + if irng: + self.log.info('output range was changed manually') + self._htr_range = irng + else: + self.log.info('output was off - switch on again') + self.command('RANGE', self._htr_range) + + def read_max_power(self): + icurrent, htr_range = self.query(f'CLIMIT? {self.output_no}', float, float, float, int, int)[3:] + htr_range = self.query('RANGE?', int) or htr_range + self._htr_range = htr_range + self._power_scale = self.max_currents[icurrent] ** 2 * self.heater_ranges[htr_range] / 1e4 + return self.calc_power(100) + + def read_htr(self): + return self.query('HTR?', float) + + +class AnalogOutput340(AnalogOutput): + model = 340 + output_no = Parameter(datatype=IntRange(2, 2), default=2) + + def configure(self): + if self._control_loop is not None: + self.put_manual_power(self.target) + else: + self.command('ANALOG 2', 0, 3, self._control_loop.channel) + self.put_manual_power(self._control_loop.power_offset) + + def put_manual_power(self, value): + if self._control_loop is None: + self.command('ANALOG 2', 0, 2, 0, 0, 0, 0, 0, self.calc_percent(value)) + else: + self.command(f'MOUT {self.output_no}', self.calc_percent(value)) + + +# --- MODELS 336, 350, 224, ... +class Device2(Device): + """second generation LakeShore models""" + channels = 'ABCD' + user_curves = (21, 60) # the last curve is 59 + max_raw_unit = 99999 + TYPES = {'DT': 1, 'TG': 1, 'PT': 2, 'RF': 2, 'CX': 3, 'RX': 3, 'CC': 3, 'GE': 3, 'TC': 4} + + +# --- MODEL 336 --- + +class IO336(IO): + model = 336 + + +class Device336(Device2): + model = 336 + + +class Sensor336(Sensor): + model = 336 + + +class Loop336(Loop, Sensor336): + pass + + +class MainOutput336(MainOutput): + model = 336 + output_no = Parameter(datatype=IntRange(1, 1)) + imax = 2 + vmax = 50 + # 3 ranges only + heater_ranges = {i: 10 ** (3 - i) for i in range(5)} + sorted_factors = sorted((v, i) for i, v in heater_ranges.items()) + + +class SecondaryOutput336(MainOutput336): + model = 336 + output_no = Parameter(datatype=IntRange(2, 2)) + imax = 1.414 + vmax = 35.4 + max_power = Parameter(datatype=FloatRange(0, 50, unit='W')) + + +class AnalogOutput336(AnalogOutput): + model = 336 + output_no = Parameter(datatype=IntRange(3, 4)) + + +# --- MODEL 350 --- + +class Device350(Device2): + model = 350 + + +class Sensor350(Sensor): + model = 350 + + def get_curve_type(self, calcurve): + logformat, range_limit = super().get_curve_type(calcurve) + excit = 0 + if self.intype[0] == 3 and calcurve.calibrange[0] > 0.2: + excit = 1 # TODO: add extra parameter for excitation + self.intype += (excit,) + return logformat, range_limit + + +class Loop350(Loop, Sensor350): + pass + + +class MainOutput350(Output): + model = 350 + output_no = Parameter(datatype=IntRange(1, 1)) + imax = 1.732 + max_power = Parameter(datatype=FloatRange(0, 75, unit='W')) + + +class AnalogOutput350(AnalogOutput): + model = 350 + output_no = Parameter(datatype=IntRange(3,4)) + + +# --- MODEL 224 --- + +class Device224(Device2): + model = 224 + channels = 'A', 'B', 'C1', 'C2', 'C3', 'C4', 'C5', 'D1', 'D2', 'D3', 'D4', 'D5' + + +class Sensor224(Sensor): + model = 224