1254 lines
47 KiB
Python
1254 lines
47 KiB
Python
# *****************************************************************************
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Module authors:
|
|
# Oksana Shliakhtun <oksana.shliakhtun@psi.ch>
|
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
|
# *****************************************************************************
|
|
"""driver for various lakeshore temperature monitors/controllers"""
|
|
|
|
import time
|
|
import math
|
|
import random
|
|
import threading
|
|
import numpy as np
|
|
from numpy.testing import assert_approx_equal
|
|
|
|
from frappy.core import Module, Readable, Parameter, Property, \
|
|
HasIO, StringIO, Writable, IDLE, ERROR, BUSY, DISABLED, nopoll, Attached
|
|
from frappy.datatypes import IntRange, FloatRange, StringType, \
|
|
BoolType, ValueType, StatusType
|
|
from frappy.errors import CommunicationFailedError, ConfigError, \
|
|
HardwareError, DisabledError, ImpossibleError, secop_error, SECoPError
|
|
from frappy.lib.units import NumberWithUnit, format_with_unit
|
|
from frappy.lib import formatStatusBits
|
|
from frappy_psi.convergence import HasConvergence
|
|
from frappy.mixins import HasOutputModule, HasControlledBy
|
|
from frappy.extparams import StructParam
|
|
from frappy_psi.calcurve import CalCurve
|
|
|
|
|
|
def string_to_num(string):
|
|
try:
|
|
return int(string)
|
|
except ValueError:
|
|
return float(string)
|
|
|
|
|
|
class IO(StringIO):
|
|
"""IO classes of most LakeShore models will inherit from this"""
|
|
end_of_line = '\n'
|
|
wait_before = -1
|
|
# ValueType(str): accept and convert also numbers
|
|
model = Property('model name', ValueType(str), value='')
|
|
|
|
def initModule(self):
|
|
if self.wait_before < 0:
|
|
# wait before is not needed when connected via LAN
|
|
# as the port number is always 7777, the following is a quite
|
|
# good guess
|
|
self.wait_before = 0 if ':7777' in self.uri else 0.05
|
|
super().initModule()
|
|
|
|
def checkHWIdent(self):
|
|
self.identification = [('*IDN?', f'LSCI,MODEL{self.model},.*')]
|
|
super().checkHWIdent()
|
|
|
|
|
|
class HasLscIO(HasIO):
|
|
ioClass = IO
|
|
|
|
def query(self, msg, *converters):
|
|
"""send a query and parse result
|
|
|
|
:param msg: the query message, contains a '?'
|
|
:param converters: converters for the result values
|
|
:return: the converted value(s)
|
|
when more than one converter is given, this is a list
|
|
"""
|
|
response = self.communicate(msg).strip()
|
|
if not converters:
|
|
return response
|
|
values = sum((v.split(',') for v in response.split(';')), start=[])
|
|
if len(values) < len(converters):
|
|
raise CommunicationFailedError(
|
|
f'expected {len(converters)} values,'
|
|
f' got {values} as reply to {msg}')
|
|
result = [c(v.strip()) for (c, v) in zip(converters, values)]
|
|
return result[0] if len(converters) == 1 else result
|
|
|
|
def command(self, msghead, *args):
|
|
"""send a command to change parameters and query the replies
|
|
|
|
when no args are given, the command and a dummy request '*OPC?'
|
|
is sent in order to wait for completion.
|
|
|
|
:param msghead: the message, including the arguments needed
|
|
for the query (e.g. channel, output_no)
|
|
:param args: parameters to set.
|
|
the converters for the reply are taken from their types
|
|
:return: the converted readback value(s)
|
|
when more than one argument is given, this is a list
|
|
"""
|
|
if not args:
|
|
self.communicate(f'{msghead};*OPC?')
|
|
return None
|
|
# when an argument is given as integer, it might be that this argument might be a float
|
|
converters = [string_to_num if isinstance(a, int) else type(a) for a in args]
|
|
values = [a if isinstance(a, str) else f'{a:g}'
|
|
for c, a in zip(converters, args)]
|
|
if ' ' in msghead:
|
|
query = msghead.replace(' ', '? ', 1)
|
|
cmd = ','.join([msghead] + values)
|
|
else:
|
|
query = msghead + '?'
|
|
cmd = f"{msghead} {','.join(values)}"
|
|
return self.query(f'{cmd};{query}', *converters)
|
|
|
|
|
|
class Device(HasLscIO, Module):
|
|
"""common SECoP module for curve handling"""
|
|
ioClass = IO
|
|
status = Parameter('status for installing calib', StatusType(Readable, 'BUSY'))
|
|
# ValueType(str): accept and convert also numbers
|
|
model = Property('model name', ValueType(str), value='')
|
|
curve_handling = Property('is curve handling enabled', BoolType(), default=False)
|
|
curve_cache = None
|
|
channels = None # sequence of available channels
|
|
user_curves = None # range of user curves (tuple)
|
|
log_formats = True, False # tuple (<log Ohm allowed>, <log K allowed>)
|
|
max_curve_length = 200
|
|
cmds_per_line = 5 # number of commands / replies allowed on one line
|
|
_empty_curves = None
|
|
_request_lock = None
|
|
FAST_POLL = 0.01
|
|
|
|
def initModule(self):
|
|
self._empty_curves = {} # dict <curve no> of None (used as ordered set)
|
|
self._curve_map = {} # dict CurveHeader.key of curve_no
|
|
self._requests = {} # dict <calcurve> of CurveRequest
|
|
self._sensors = {} # dict <channel> of sensors
|
|
self._disable_channels = set(self.channels)
|
|
self._request_lock = threading.Lock()
|
|
self.io.setProperty('model', self.model)
|
|
super().initModule()
|
|
|
|
def curve_request(self, sensor):
|
|
"""register a sensor for checking or uploading the calibration curve
|
|
|
|
:param sensor: the sensor module
|
|
"""
|
|
with self._request_lock:
|
|
prev_calcurve = getattr(self._sensors.get(sensor.channel), 'calcurve', None)
|
|
self._sensors[sensor.channel] = sensor
|
|
if prev_calcurve in self._requests:
|
|
for ch, sens in self._sensors.items():
|
|
if prev_calcurve == sens.calcurve:
|
|
break
|
|
else:
|
|
self._requests.pop(prev_calcurve, None)
|
|
self.log.info('interrupt installing %s', prev_calcurve)
|
|
prev_request = self._requests.get(sensor.calcurve)
|
|
req = CurveRequest(sensor, self)
|
|
if prev_request:
|
|
if np.array_equal(prev_request.points, req.points):
|
|
# a request is already running and the curve content has not changed
|
|
self.log.info('already installing %s', sensor.calcurve)
|
|
req.add_sensor(sensor)
|
|
else:
|
|
self.log.info('file has changed, restart treating %s', sensor.calcurve)
|
|
self._requests[sensor.calcurve] = req
|
|
else:
|
|
self._requests[sensor.calcurve] = req
|
|
self._disable_channels.discard(sensor.channel)
|
|
self.setFastPoll(True, self.FAST_POLL)
|
|
|
|
def doPoll(self):
|
|
"""this is not really a poller, but a worker task
|
|
|
|
as loading and checking curves takes a while, this is done in the
|
|
background. does nothing after all curves are checked / loaded
|
|
and is reactivated only when curves are changed during runtime
|
|
"""
|
|
if not self.curve_handling or not self.io.is_connected:
|
|
return
|
|
if not self.curve_cache:
|
|
self.curve_cache = {}
|
|
headers = self.get_headers()
|
|
for curve_no in range(*self.user_curves):
|
|
crvhdr = CurveHeader(*headers[curve_no].split(','))
|
|
self.curve_cache[curve_no] = crvhdr
|
|
if crvhdr.key:
|
|
if crvhdr in self._curve_map:
|
|
# this is a duplicate, add to empty curves
|
|
self._empty_curves[curve_no] = None
|
|
else:
|
|
self._curve_map[crvhdr.key] = curve_no
|
|
try:
|
|
check_item = None
|
|
with self._request_lock:
|
|
for key, request in self._requests.items():
|
|
if request.loading:
|
|
break
|
|
if request.status[0] != ERROR:
|
|
check_item = key, request
|
|
else:
|
|
# no loading action found -> do check action
|
|
if check_item:
|
|
key, request = check_item
|
|
else:
|
|
request = None
|
|
if request:
|
|
request.action = request.action(request)
|
|
if request.action is not None:
|
|
return
|
|
self.finish_curve(self._requests.pop(key))
|
|
# no more requests pending
|
|
while self._disable_channels:
|
|
ch = self._disable_channels.pop()
|
|
sensor = self._sensors.get(ch)
|
|
if sensor is None or sensor.disabled:
|
|
self.disable_channel(ch)
|
|
except Exception as e:
|
|
request.loading = False
|
|
request.status = ERROR, repr(e)
|
|
# self._requests.pop(key)
|
|
raise
|
|
if not self._requests:
|
|
self.setFastPoll(False)
|
|
|
|
def read_status(self):
|
|
status = IDLE, ''
|
|
for req in self._requests.values():
|
|
if req.loading:
|
|
return req.status
|
|
status = req.status
|
|
return status
|
|
|
|
def get_empty(self):
|
|
"""get curve no of a curve to be reused"""
|
|
if self._empty_curves:
|
|
# we have unused curve slots
|
|
curve_no = next(iter(self._empty_curves))
|
|
self._empty_curves.pop(curve_no)
|
|
else:
|
|
used_no = set(s.curve_no for s in self._sensors.values())
|
|
for req in self._requests.values():
|
|
used_no.add(req.invalidate)
|
|
n0, n = self.user_curves
|
|
n -= n0
|
|
# avoid to take the lower numbers first
|
|
# as then the most recent curves would be overridden
|
|
offset = random.randrange(n)
|
|
for i in range(n):
|
|
curve_no = n0 + (i + offset) % n
|
|
if curve_no not in used_no:
|
|
break
|
|
else:
|
|
raise ValueError('no empty curves available')
|
|
return curve_no
|
|
|
|
def get_calib_state(self, sensor):
|
|
if not self.io.is_connected:
|
|
return ERROR, 'no connection'
|
|
req = self._requests.get(sensor.calcurve)
|
|
if req:
|
|
if req.loading:
|
|
return ImpossibleError(f'can not read T while {req.status[1]}')
|
|
if req.status[0] >= ERROR:
|
|
return ImpossibleError('error while loading calibration curve')
|
|
return None
|
|
|
|
def verify_ends(self, request):
|
|
"""preliminary check: check if the ends of the curve are matching the stored ones"""
|
|
npnt = len(request.points)
|
|
numbers = [1, npnt]
|
|
points = [request.points[0], request.points[npnt-1]]
|
|
if npnt < self.max_curve_length:
|
|
numbers.append(npnt + 1)
|
|
points.append((0, 0))
|
|
for pairs in zip(points, self.get_crvpts(request.curve_no, *numbers)):
|
|
if not self.is_equal(*pairs):
|
|
return False
|
|
return True
|
|
|
|
def find_curve(self, request):
|
|
"""try to find curve and return required action
|
|
|
|
:param request: the curve request
|
|
:return: next action
|
|
"""
|
|
no = self._curve_map.get(request.crvhdr.key)
|
|
if no:
|
|
request.crvhdr = self.curve_cache[no]
|
|
request.set_curve_no(no)
|
|
if self.verify_ends(request):
|
|
self.log.info('calcurve #%d %s found, start to check consistency', no, request.crvhdr)
|
|
# guess the curve is o.k. -> install
|
|
request.install_sensors(request.sensors.values())
|
|
request.pointer = 1
|
|
request.loading = False
|
|
request.status = IDLE, 'checking calcurve'
|
|
for sensor in request.sensors.values():
|
|
sensor.pollInfo.trigger()
|
|
return self.check_points
|
|
request.invalidate = no
|
|
request.set_curve_no(self.get_empty())
|
|
request.install_sensors(request.sensors.values())
|
|
self.log.info('%s found, but content has changed, create #%d',
|
|
request.crvhdr.sn, request.curve_no)
|
|
return self.start_load
|
|
request.set_curve_no(self.get_empty())
|
|
for sensor in request.sensors.values():
|
|
sensor.install_sensor()
|
|
self.log.info('load curve #%d %s', request.curve_no, request.crvhdr)
|
|
return self.start_load
|
|
|
|
def check_points(self, request):
|
|
"""check next point
|
|
|
|
:param request: the curve request
|
|
:return: next action
|
|
"""
|
|
if request.pointer >= len(request.points):
|
|
return None # finish
|
|
if request.new_sensors:
|
|
# install sensors for the same curve added in the meantime
|
|
sensors = request.new_sensors
|
|
request.new_sensors = set()
|
|
request.install_sensors(sensors)
|
|
given = request.points[request.pointer:request.pointer + self.cmds_per_line]
|
|
first = request.pointer + 1
|
|
no = request.curve_no
|
|
for n, (pt, ptg) in enumerate(zip(self.get_crvpts(no, *range(first, first + len(given))), given)):
|
|
if not self.is_equal(pt, ptg):
|
|
self.log.info('reply (%g, %g) does not match given point %d (%g,%g)',
|
|
*pt, first + n, *ptg)
|
|
request.invalidate = no
|
|
request.set_curve_no(self.get_empty())
|
|
self.log.info('%s has changed, create #%d', request.crvhdr.sn, request.curve_no)
|
|
request.loading = True
|
|
return self.start_load
|
|
|
|
request.pointer += self.cmds_per_line
|
|
return self.check_points
|
|
|
|
def start_load(self, request):
|
|
"""start loading a curve
|
|
|
|
:param request: the curve request
|
|
:return: next action
|
|
"""
|
|
request.status = BUSY, 'loading calcurve'
|
|
# delete first to make sure there is nothing left after the loaded points
|
|
self.command(f'CRVDEL {request.curve_no}')
|
|
# write a temporary header
|
|
# (in case loading will not finish, the curve should be marked as empty)
|
|
header = list(request.crvhdr)
|
|
self.put_header(request.curve_no, 'loading...', '', *header[2:])
|
|
request.pointer = 0
|
|
return self.load_points
|
|
|
|
def load_points(self, request):
|
|
"""load the next point(s)
|
|
|
|
:param request: the curve request
|
|
:return: next action
|
|
"""
|
|
try:
|
|
given = request.points[request.pointer:request.pointer+self.cmds_per_line]
|
|
first = request.pointer + 1
|
|
cmds = [f'CRVPT {request.curve_no},{first + n},{x:g},{y:g};'
|
|
f'CRVPT?{request.curve_no},{first + n}' for n, (x, y) in enumerate(given)]
|
|
for n, (reply, pt) in enumerate(zip(self.communicate(';'.join(cmds)).split(';'), given)):
|
|
if not self.is_equal([float(r) for r in reply.split(',')], pt):
|
|
# self.log.error('reply %r does not match given point %d (%g,%g)'
|
|
# '-> please check for values outside allowed range', reply, first + n, *pt)
|
|
raise HardwareError(f'reply {reply.strip()} does not match given point '
|
|
f'{first + n} {pt[0]:g},{pt[1]:g}'
|
|
'-> please check for values outside allowed range')
|
|
request.pointer += self.cmds_per_line
|
|
return None if request.pointer >= len(request.points) else self.load_points
|
|
except Exception as e:
|
|
self.log.exception('error in load_points %s', e)
|
|
raise
|
|
|
|
def finish_curve(self, request):
|
|
"""write header and assign to channel(s)
|
|
|
|
:param request: the curve request
|
|
:return: next action
|
|
"""
|
|
self.curve_cache[request.curve_no] = CurveHeader(*request.crvhdr)
|
|
self._curve_map[request.crvhdr.key] = request.curve_no
|
|
self.put_header(request.curve_no, *request.crvhdr)
|
|
request.install_sensors(request.sensors.values())
|
|
no = request.invalidate
|
|
comment = 'finished' if request.loading else 'verified'
|
|
if no:
|
|
self.log.info('calcurve #%d %s %s, clear previous #%d',
|
|
request.curve_no, request.crvhdr, comment, no)
|
|
self.curve_cache[no] = CurveHeader()
|
|
self._empty_curves[no] = None
|
|
else:
|
|
self.log.info('calcurve #%d %s %s',
|
|
request.curve_no, request.crvhdr, comment)
|
|
request.status = None
|
|
for sensor in request.sensors.values():
|
|
try:
|
|
sensor.pollInfo.trigger()
|
|
except Exception:
|
|
pass
|
|
|
|
def put_header(self, curve_no, name, sn, fmt, limit, coef):
|
|
"""write header"""
|
|
self.communicate(f'CRVHDR {curve_no},"{name}","{sn}",{fmt},{limit},{coef};*OPC?')
|
|
|
|
def is_equal(self, left, right, fixeps=(1.1e-5, 1.1e-5), significant=6):
|
|
"""check whether a returned calibration point is equal within curve point precision"""
|
|
for v1, v2, eps in zip(left, right, fixeps):
|
|
try:
|
|
assert_approx_equal(v1, v2, significant, verbose=False)
|
|
except AssertionError:
|
|
return abs(v1 - v2) < eps
|
|
return True
|
|
|
|
def get_crvpts(self, curve_no, *numbers):
|
|
"""read curve points
|
|
|
|
:param curve_no: curve number
|
|
:param numbers: indices of points to retrieve
|
|
:return: list of points (x,y)
|
|
"""
|
|
replies = []
|
|
for i in range(0, len(numbers), self.cmds_per_line):
|
|
cmds = [f'CRVPT?{curve_no},{n}' for n in numbers[i:i+self.cmds_per_line]]
|
|
replies.extend(self.communicate(';'.join(cmds)).split(';'))
|
|
return [[float(v) for v in xy.split(',')] for xy in replies]
|
|
|
|
def get_headers(self):
|
|
"""get all headers
|
|
|
|
for performance reasons, by default 5 headers are read in one line
|
|
this is speeing up quite a lot on serial connections
|
|
"""
|
|
n0, n1 = self.user_curves
|
|
result = {}
|
|
for ni in range(n0, n1, self.cmds_per_line):
|
|
cmd = ';'.join(f'CRVHDR?{n}' for n in range(ni, min(n1, ni + self.cmds_per_line)))
|
|
for i, reply in enumerate(self.communicate(cmd).split(';')):
|
|
result[ni + i] = reply
|
|
return result
|
|
|
|
def disable_channel(self, channel):
|
|
self.command(f'INTYPE {channel},0')
|
|
|
|
|
|
class CurveRequest:
|
|
invalidate = None
|
|
loading = True
|
|
status = BUSY, 'looking up calcurve'
|
|
|
|
def __init__(self, sensor, device):
|
|
self.action = device.find_curve
|
|
self.new_sensors = set()
|
|
self.sensors = {sensor.channel: sensor}
|
|
calcurve = CalCurve(sensor.calcurve)
|
|
equipment_id = device.propertyValues.get('original_id') or device.secNode.equipment_id
|
|
name = f"{equipment_id.split('.')[0]}.{sensor.name}"
|
|
sn = calcurve.calibname
|
|
limit = calcurve.calibrange[1]
|
|
unit = calcurve.options.get('unit', 'Ohm')
|
|
logformat, range_limit = sensor.get_curve_type(calcurve)
|
|
self.points = calcurve.export(logformat, device.max_curve_length)
|
|
if range_limit:
|
|
x0, x1 = range_limit
|
|
if x0 > calcurve.xrange[0] or calcurve.xrange[1] > x0:
|
|
# we do not limit in the first place already, because of the following warning
|
|
if calcurve.xrange[1] > x1:
|
|
device.log.warn('curve %s cut at %g %s (upper end was %g %s)',
|
|
sn, x1, unit, calcurve.xrange[1], unit)
|
|
if x0 > calcurve.xrange[0]:
|
|
device.log.warn('curve %s cut at %g %s (lower end was %g %s)',
|
|
sn, x0, unit, calcurve.xrange[0], unit)
|
|
self.points = calcurve.export(logformat, device.max_curve_length, xlimits=range_limit)
|
|
ymax = max(self.points[0, 1], self.points[-1, 1])
|
|
limit = min(limit, 10 ** ymax if calcurve.logformat[1] else ymax)
|
|
device.log.info('exported %d points from %s', len(self.points), calcurve.filename)
|
|
if unit == 'Ohm':
|
|
if calcurve.logformat[1]:
|
|
fmt = 5
|
|
elif calcurve.logformat[0]:
|
|
fmt = 4
|
|
else:
|
|
fmt = 3
|
|
else:
|
|
fmt = 2 if unit == 'V' else 1
|
|
coef = 1 + (self.points[0][1] < self.points[-1][1]) # 1: ntc, 2: ptc
|
|
self.crvhdr = CurveHeader(name, sn, fmt, limit, coef)
|
|
|
|
def set_curve_no(self, curve_no):
|
|
self.curve_no = curve_no
|
|
for s in self.sensors.values():
|
|
s.curve_no = curve_no
|
|
|
|
def add_sensor(self, sensor):
|
|
self.sensors[sensor.channel] = sensor
|
|
self.new_sensors.add(sensor)
|
|
|
|
def install_sensors(self, sensors=None):
|
|
for sensor in sensors:
|
|
sensor.install_sensor()
|
|
sensor.install_curve()
|
|
|
|
|
|
class CurveHeader(tuple):
|
|
"""curve header class
|
|
|
|
carries metadata of curves
|
|
"""
|
|
def __new__(cls, name='', sn='', fmt=0, limit=0, coef=0):
|
|
"""initialize curve header. convert from string if needed
|
|
|
|
:param name: a name
|
|
:param sn: the serial 'number' - is a string as it contains letters
|
|
:param fmt: 1..5 (mV,K), (V,K), (Ohm,K), (log Ohm, K), (log Ohm, log K)
|
|
:param limit: setpoint limit (K)
|
|
:param coef: 1, 2 (negative, positive)
|
|
"""
|
|
return tuple.__new__(cls, (name.strip(), sn.strip(), int(fmt), float(limit), int(coef)))
|
|
|
|
@property
|
|
def name(self):
|
|
return self[0]
|
|
|
|
@property
|
|
def sn(self):
|
|
return self[1]
|
|
|
|
@property
|
|
def fmt(self):
|
|
return self[2]
|
|
|
|
@property
|
|
def limit(self):
|
|
return self[3]
|
|
|
|
@property
|
|
def coef(self):
|
|
return self[4]
|
|
|
|
@property
|
|
def key(self):
|
|
"""key for curve lookup"""
|
|
if self.sn:
|
|
return self.sn.lower(), self.fmt, self.coef
|
|
return None
|
|
|
|
|
|
def get_cfg_value(cfgdict, key, default=object):
|
|
# get value from a cfgdict
|
|
param = cfgdict.get(key)
|
|
try:
|
|
return param['value']
|
|
except TypeError:
|
|
# param is not a dict
|
|
return param
|
|
except KeyError:
|
|
if type(param).__name__ != 'Param':
|
|
return param
|
|
if default is object:
|
|
raise KeyError(f'cfg dict does not contain {key!r}')
|
|
return default
|
|
|
|
|
|
class Base(HasLscIO):
|
|
model = None
|
|
device = Attached(Device, mandatory=False) # device is not needed without curve handling
|
|
|
|
def initModule(self):
|
|
if self.device and not self.io:
|
|
self.io = self.device.io.name
|
|
super().initModule()
|
|
|
|
|
|
class Sensor(Base, Readable):
|
|
"""base class for sensors"""
|
|
value = Parameter(unit='K')
|
|
status = Parameter(datatype=StatusType(Readable, 'DISABLED', 'BUSY'))
|
|
raw = Parameter('raw sensor value', datatype=FloatRange(unit='Ohm'), default=0)
|
|
channel = Property('used channel', StringType()) # input
|
|
calcurve = Parameter('calibration curve name', StringType(), readonly=False)
|
|
enabled = Parameter('enable flag', BoolType(), readonly=False)
|
|
classes = {} # dict <model> of class
|
|
curve_no = None
|
|
STATUS_BIT_LABELS = 'invalid_reading old_reading b2 b3 t_under t_over units_zero units_overrange'.split()
|
|
_read_error = None
|
|
_raw_error = None
|
|
_do_read = True
|
|
_curve_handling = False # True when a device is present and device.curve_handling is True
|
|
# TODO: implement alarms
|
|
|
|
def initModule(self):
|
|
super().initModule()
|
|
if self.device:
|
|
self._curve_handling = self.device.curve_handling
|
|
if not self._curve_handling:
|
|
self.parameters['calcurve'].setProperty('export', False)
|
|
|
|
def doPoll(self):
|
|
self.read_status() # polls also value and raw
|
|
|
|
@nopoll
|
|
def read_value(self):
|
|
self.status, value, self.raw = self.get_data()
|
|
if isinstance(value, SECoPError):
|
|
raise value.copy()
|
|
return value
|
|
|
|
@nopoll
|
|
def read_raw(self):
|
|
self.status, self.value, raw = self.get_data()
|
|
if isinstance(raw, SECoPError):
|
|
raise raw.copy()
|
|
return raw
|
|
|
|
def read_status(self):
|
|
try:
|
|
status, self.value, self.raw = self.get_data()
|
|
except Exception as e:
|
|
self.raw = self.value = secop_error(e)
|
|
raise
|
|
return status
|
|
|
|
def get_data(self):
|
|
"""get reading status, raw and Kelvin value with minimal delay"""
|
|
status = IDLE, ''
|
|
if self.enabled:
|
|
ch = self.channel
|
|
rdgst, raw, value = self.query(f'RDGST?{ch};SRDG?{ch};KRDG?{ch}', int, float, float)
|
|
rdgst &= 0xfd # suppress old reading
|
|
if rdgst:
|
|
statuslist = formatStatusBits(rdgst, self.STATUS_BIT_LABELS)
|
|
status = ERROR, statuslist[-1] # show only the most fatal error
|
|
value = HardwareError(statuslist[-1])
|
|
while statuslist and statuslist[-1].startswith('t_'):
|
|
statuslist.pop()
|
|
if statuslist:
|
|
raw = HardwareError(statuslist[-1])
|
|
elif self._curve_handling:
|
|
value_error = self.device.get_calib_state(self)
|
|
if value_error:
|
|
value = value_error
|
|
status = ERROR, str(value_error)
|
|
else:
|
|
raw = value = DisabledError('disabled')
|
|
status = DISABLED, 'disabled'
|
|
self.enable = False
|
|
return status, value, raw
|
|
|
|
def write_calcurve(self, calibname):
|
|
if not self._curve_handling:
|
|
raise ConfigError('curve handling is off')
|
|
self.enabled = True
|
|
self.device.curve_request(self)
|
|
|
|
def write_enabled(self, value):
|
|
if self._curve_handling:
|
|
if value:
|
|
self.write_calcurve(self.calcurve)
|
|
else:
|
|
self.device.disable_channel(self.channel)
|
|
|
|
def read_enabled(self):
|
|
if not self.query(f'INTYPE?{self.channel}', int):
|
|
return False
|
|
return self.enabled
|
|
|
|
def get_curve_type(self, calcurve):
|
|
unit = calcurve.options.get('unit', 'Ohm')
|
|
logformat = False
|
|
range_limit = (1e-5, 9999.99)
|
|
if unit == 'Ohm':
|
|
if calcurve.ptc:
|
|
# <sensor type>,<autorange>,<range>,<compensation>,<units>
|
|
self.intype = 2, 1, 6, 1, 1 # PTC
|
|
else:
|
|
range_limit = (1e-5, 99999.9)
|
|
logformat = True, False
|
|
self.intype = 3, 1, 8, 1, 1 # NTC
|
|
elif unit == 'V': # diode
|
|
self.intype = (1, 1, 0, 0, 1) if calcurve.xscale[1] <= 2.5 else (1, 1, 1, 0, 1)
|
|
else: # thermocouple
|
|
self.intype = (4, 0, 0, 1, 1)
|
|
return logformat, range_limit
|
|
|
|
def install_sensor(self):
|
|
if self.query(f'INTYPE?{self.channel}', *(type(v) for v in self.intype)) != self.intype:
|
|
self.command(f'INTYPE {self.channel}', *self.intype)
|
|
|
|
def install_curve(self):
|
|
if self.query(f'INCRV?{self.channel}', int) != self.curve_no:
|
|
self.command(f'INCRV {self.channel}', self.curve_no)
|
|
|
|
|
|
class Output(Base, HasControlledBy, Writable):
|
|
"""base class for heater output"""
|
|
classes = {} # dict <model> of dict <output_no> of class
|
|
value = Parameter('heater output', FloatRange(0, 100, unit='W'))
|
|
target = Parameter('manual heater output', FloatRange(0, 100, unit='W'), default=0)
|
|
htr = Parameter('heater output current percentage', FloatRange(0, 100, unit='%'))
|
|
max_heater = Parameter('desired max. heater with unit W, A or V', StringType(), readonly=False)
|
|
max_power = Parameter('''max heater power\n
|
|
value will be clamped to allowed range and may be rounded to discrete values
|
|
''', FloatRange(0, 100, unit='W'), readonly=False)
|
|
# for the case when several outputs are possible:
|
|
output_no = Parameter('lakeshore output or loop number', IntRange(1, 4), default=1)
|
|
resistance = Parameter('heater resistance', FloatRange(10, 100), readonly=False, default=25)
|
|
Extension = None
|
|
AMP_VOLT_WATT = NumberWithUnit('A', 'V', 'W')
|
|
imax = None
|
|
vmax = None
|
|
n_ranges = None
|
|
n_currents = None
|
|
_power_scale = 1e-4
|
|
# sorted_factors: list of possible current squares (I^2, idx)
|
|
# (multiply I^2 with resistance to get power)
|
|
sorted_factors = None
|
|
errorstatus = None
|
|
_desired_max_power = None
|
|
_control_loop = None # a loop module object when controlled, None when in manual mode
|
|
power_offset = 0 # offset for closed_loop
|
|
|
|
def configure(self):
|
|
"""configure the output
|
|
|
|
based on self._control_loop, self.resistance and self._desired_max_power
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def set_closed_loop(self, loop):
|
|
"""set to control mode
|
|
|
|
:param loop: the temperature loop module
|
|
"""
|
|
if self._control_loop != loop:
|
|
self.log.info(f'set output to be controlled on channel {loop.channel}')
|
|
self._control_loop = loop
|
|
self.configure()
|
|
else:
|
|
self.fix_heater_range()
|
|
|
|
def set_open_loop(self):
|
|
"""set to open loop"""
|
|
if self._control_loop is not None:
|
|
self.log.info('put output into manual mode')
|
|
self._control_loop = None
|
|
self.configure()
|
|
else:
|
|
self.fix_heater_range()
|
|
|
|
def fix_heater_range(self):
|
|
"""switch on heater range, if off"""
|
|
|
|
def write_output_no(self, output_no):
|
|
if self._desired_max_power is not None:
|
|
self.output_no = output_no # early update needed by configure()
|
|
self.configure()
|
|
|
|
def write_resistance(self, resistance):
|
|
if self._desired_max_power is not None:
|
|
self.resistance = resistance # early update needed by configure()
|
|
self.configure()
|
|
|
|
def get_best_power_idx(self, max_power, round_down_limit=1.0001):
|
|
"""get best index from sorted_factors to match given max_current"""
|
|
prev_pwr = 0
|
|
prev_idx = None
|
|
for fact, idx in self.sorted_factors:
|
|
pwr = min(fact * self.resistance, self.vmax ** 2 / self.resistance)
|
|
if pwr >= max_power:
|
|
if pwr == prev_pwr:
|
|
# happens when we reached voltage compliance limit
|
|
return prev_idx
|
|
if max_power - prev_pwr < pwr - max_power and max_power < prev_pwr * round_down_limit:
|
|
# previous value is closer and not above round down limit
|
|
return prev_idx
|
|
return idx
|
|
prev_idx = idx
|
|
return prev_idx
|
|
|
|
def calc_power(self, percent):
|
|
# power is limited by voltage compliance
|
|
return min(percent ** 2 * self._power_scale * self.resistance,
|
|
self.vmax ** 2 / self.resistance)
|
|
|
|
def calc_percent(self, power):
|
|
return min(100., math.sqrt(power / self.resistance / self._power_scale))
|
|
|
|
def read_status(self):
|
|
"""get heater status"""
|
|
return IDLE, ''
|
|
|
|
def put_manual_power(self, value):
|
|
self.command(f'MOUT {self.output_no}', value)
|
|
|
|
def read_value(self):
|
|
return self.calc_power(self.read_htr())
|
|
|
|
def read_htr(self):
|
|
"""read heater percent"""
|
|
raise NotImplementedError
|
|
|
|
def write_max_power(self, max_power):
|
|
self._desired_max_power = max_power
|
|
self.configure()
|
|
return self.calc_power(100)
|
|
|
|
def write_max_heater(self, value):
|
|
number, unit = self.AMP_VOLT_WATT.parse(value)
|
|
pmax = self['target'].datatype.max
|
|
if unit == 'A':
|
|
power = self.write_max_power(min(pmax, number ** 2 * self.resistance))
|
|
number = math.sqrt(power / self.resistance)
|
|
elif unit == 'V':
|
|
power = self.write_max_power(min(pmax, number ** 2 / self.resistance))
|
|
number = math.sqrt(power * self.resistance)
|
|
else:
|
|
number = self.write_max_power(min(pmax, number))
|
|
return format_with_unit(number, unit, 3)
|
|
|
|
def write_target(self, target):
|
|
self.self_controlled()
|
|
self.put_manual_power(target)
|
|
|
|
|
|
class MainOutput(Output):
|
|
"""power output with adjustable power
|
|
|
|
including common code for main output(s) on 336 / 350
|
|
"""
|
|
|
|
HTRST_MAP = {
|
|
0: (IDLE, ''),
|
|
1: (ERROR, 'Open heater load'),
|
|
2: (ERROR, 'Heater short')
|
|
}
|
|
|
|
_htr_range = 1
|
|
heater_ranges = {5 - i: 10 ** -i for i in range(5)}
|
|
sorted_factors = sorted((v, i) for i, v in heater_ranges.items())
|
|
|
|
def read_status(self):
|
|
st = self.query(f'HTRST? {self.output_no}', int)
|
|
return self.HTRST_MAP[st]
|
|
|
|
def configure(self):
|
|
if self._desired_max_power is None:
|
|
self.log.info(f'max_heater {self.writeDict} {self.max_heater}')
|
|
self.write_max_heater(self.max_heater)
|
|
self._htr_range = irng = self.get_best_power_idx(self._desired_max_power)
|
|
user_current = max(0.1, min(self.imax, 2 * math.sqrt(self._desired_max_power /
|
|
self.heater_ranges[irng] / self.resistance)))
|
|
self._power_scale = user_current ** 2 * self.heater_ranges[irng] / 1e4
|
|
self.command(f'HTRSET {self.output_no}', 1 if self.resistance < 50 else 2, 0, user_current, 1)
|
|
# self.command(f'CDISP {self.output_no}', 1, self.resistance, 1, 0)
|
|
if self._control_loop is None:
|
|
mode = self.query(f'OUTMODE?{self.output_no}', int)
|
|
if mode != 3: # open loop
|
|
self.command(f'OUTMODE {self.output_no}', 3) # control off
|
|
# self.command(f'OUTMODE {self.output_no}', 3, self.channel, 0) # control off
|
|
self.command(f'RANGE {self.output_no}', self._htr_range)
|
|
self.put_manual_power(self.target)
|
|
else:
|
|
self.command(f'OUTMODE {self.output_no}', 1, self._control_loop.channel, 0) # control on
|
|
self.command(f'RANGE {self.output_no}', self._htr_range)
|
|
self.put_manual_power(self._control_loop.power_offset)
|
|
|
|
def read_max_power(self):
|
|
curidx, self._user_current = self.query(f'HTRSET? {self.output_no}', int, int, float)[1:3]
|
|
if not self._user_current:
|
|
self._user_current = 2.0 ** (curidx * 0.5 - 1)
|
|
self._htr_range = self.query(f'RANGE?{self.output_no}', int) or self._htr_range
|
|
self._power_scale = self.imax ** 2 * self.heater_ranges[self._htr_range] / 1e4
|
|
return self.calc_power(100)
|
|
|
|
def fix_heater_range(self):
|
|
# switch heater range on, if needed
|
|
irng = self.query(f'RANGE?{self.output_no}', int)
|
|
if irng != self._htr_range:
|
|
if irng:
|
|
self.log.info('output range was changed manually')
|
|
self._htr_range = irng
|
|
else:
|
|
self.log.info('output was off - switch on again')
|
|
self.command(f'RANGE {self.output_no}', self._htr_range)
|
|
|
|
def read_htr(self):
|
|
return self.query(f'HTR? {self.output_no}', float)
|
|
|
|
|
|
class AnalogOutput(Output):
|
|
"""low power (max. 1W) output"""
|
|
output_no = Parameter(datatype=IntRange(3, 4), default=3)
|
|
resistance = Parameter(default=100)
|
|
max_power = Parameter(readonly=True)
|
|
max_heater = Parameter(readonly=True)
|
|
vmax = 10
|
|
imax = 0.1
|
|
|
|
def read_max_power(self):
|
|
# max power can not be changed
|
|
# -> the return value depends on the resistance only
|
|
self._power_scale = self.vmax ** 2 / 1e4
|
|
return self.calc_power(100)
|
|
|
|
def write_max_power(self, value):
|
|
# max power is not configurable
|
|
return self.read_max_power()
|
|
|
|
def write_output_no(self, output_no):
|
|
self.output_no = output_no
|
|
self.configure()
|
|
|
|
def calc_power(self, percent):
|
|
# power is limited by max current
|
|
return min(percent ** 2 * self._power_scale / self.resistance,
|
|
self.imax ** 2 / self.resistance)
|
|
|
|
def configure(self):
|
|
if self._control_loop is None:
|
|
# 3: open loop, 0: powerup enable off
|
|
self.command(f'OUTMODE {self.output_no}', 3, 0, 0)
|
|
self.put_manual_power(self.target)
|
|
else:
|
|
# 1: closed loop, 0: powerup enable off
|
|
self.command(f'OUTMODE {self.output_no}', 1, self._control_loop.channel, 0)
|
|
self.put_manual_power(self._control_loop.power_offset)
|
|
|
|
def read_htr(self):
|
|
return self.query(f'AOUT?{self.output_no}', float)
|
|
|
|
|
|
class Loop(HasConvergence, HasOutputModule, Sensor):
|
|
output_module = Attached(Output)
|
|
# this creates individual parameters pid_p, pid_i and pid_d automatically
|
|
ctrlpars = StructParam('ctrlpars struct', {
|
|
'p': Parameter('control parameter p', FloatRange(0, 1000)),
|
|
'i': Parameter('control parameter i', FloatRange(0, 1000)),
|
|
'd': Parameter('control parameter d', FloatRange(0, 1000)),
|
|
}, prefix='pid_', readonly=False)
|
|
power_offset = Parameter('power offset\n\n(using LakeShores Manual Output)',
|
|
FloatRange(0, 100, unit='W'), readonly=False, default=0)
|
|
classes = {}
|
|
|
|
def __init_subclass__(cls):
|
|
super().__init_subclass__()
|
|
if cls.model: # do not register abstract base classes
|
|
cls.model = str(cls.model)
|
|
cls.classes[cls.model] = cls
|
|
|
|
def write_ctrlpars(self, pid):
|
|
pid = self.command(f'PID {self.output_module.output_no}', *[pid[k] for k in 'pid'])
|
|
return dict(zip('pid', pid))
|
|
|
|
def read_ctrlpars(self):
|
|
pid = self.query(f'PID?{self.output_module.output_no}', float, float, float)
|
|
return dict(zip('pid', pid))
|
|
|
|
def write_target(self, value):
|
|
sensor_status = Sensor.read_status(self)
|
|
if sensor_status[0] >= BUSY:
|
|
raise ImpossibleError(f'can not control while status is {sensor_status}')
|
|
self.activate_control()
|
|
self.output_module.set_closed_loop(self)
|
|
self.set_target(value)
|
|
|
|
def write_power_offset(self, value):
|
|
if self.control_active:
|
|
self.output_module.put_manual_power(value)
|
|
|
|
def read_target(self):
|
|
return self.get_target()
|
|
|
|
def set_target(self, value):
|
|
return self.command(f'SETP {self.output_module.output_no}', float(value))
|
|
|
|
def get_target(self):
|
|
return self.query(f'SETP?{self.output_module.output_no}', float)
|
|
|
|
|
|
# --- MODEL 340 ---
|
|
|
|
class IO340(IO):
|
|
timeout = 5 # needed for INCRV command
|
|
model = 340
|
|
end_of_line = '\r' # default at SINQ. TODO: remove
|
|
|
|
|
|
class Device340(Device):
|
|
ioClass = IO340
|
|
model = 340
|
|
channels = 'ABCD'
|
|
user_curves = (21, 61) # the last curve is 60
|
|
log_formats = True, True # log Ohm / log K is supported
|
|
_crvsav_deadline = None
|
|
|
|
def disable_channel(self, channel):
|
|
self.communicate(f'INSET {channel},0;*OPC?')
|
|
|
|
def finish_curve(self, request):
|
|
super().finish_curve(request)
|
|
if request.loading and not self._crvsav_deadline:
|
|
# when loading a new curve, remember for sending CRVSAV later
|
|
self._crvsav_deadline = time.time() + 600
|
|
|
|
def put_header(self, curve_no, name, sn, fmt, limit, coef):
|
|
self.communicate(f'CRVHDR {curve_no},{name:15s},{sn:10s},{fmt},{limit},{coef};*OPC?')
|
|
|
|
def doPoll(self):
|
|
super().doPoll()
|
|
if self._crvsav_deadline:
|
|
# prevent flashing multiple times within short time
|
|
if time.time() > self._crvsav_deadline:
|
|
self._crvsav_deadline = None
|
|
self.communicate('CRVSAV;*OPC?')
|
|
|
|
def is_equal(self, left, right, fixeps=(1.1e-5, 1.1e-4), significant=6):
|
|
# for whatever reason, the number of digits after decimal point for the T column is only 4
|
|
return super().is_equal(left, right, fixeps, significant)
|
|
|
|
|
|
class Sensor340(Sensor):
|
|
model = 340
|
|
intype = None # arguments for the intype command
|
|
|
|
def get_curve_type(self, calcurve):
|
|
unit = calcurve.options.get('unit', 'Ohm')
|
|
logformat = False
|
|
range_limit = None
|
|
if unit == 'Ohm':
|
|
if calcurve.ptc:
|
|
xlim = calcurve.xrange[1] * 0.001 # 1 mA excitation
|
|
for rng, limit in enumerate(
|
|
[0, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05,
|
|
0.1, 0.25, 0.5, 1, 2.5, 5]):
|
|
if xlim <= limit:
|
|
break
|
|
else:
|
|
rng = 13
|
|
# we use special type here, in order to allow thermal compensation
|
|
# <type>, <units>, <coefficient>, <excitation>, <range>
|
|
self.intype = 0, 2, 2, 10, rng
|
|
else:
|
|
if calcurve.options.get('type') == 'GE':
|
|
self.intype = (10,)
|
|
else:
|
|
self.intype = (8,) # carbon and ruox are equivalent to cernox(8)
|
|
logformat = True, True
|
|
elif unit == 'V': # diode
|
|
self.intype = (1,) if calcurve.xscale[1] < 2.5 else (2,)
|
|
else: # thermocouple
|
|
self.intype = (12,)
|
|
return logformat, range_limit
|
|
|
|
def install_sensor(self):
|
|
super().install_sensor()
|
|
if self.query(f'INSET?{self.channel}', int, int) != (1, 1):
|
|
self.command(f'INSET {self.channel}', 1, 1)
|
|
|
|
|
|
class Loop340(Loop, Sensor340):
|
|
pass
|
|
|
|
|
|
class MainOutput340(MainOutput):
|
|
model = 340
|
|
output_no = Parameter(datatype=IntRange(1, 1))
|
|
HTRST_MAP = {
|
|
0: (IDLE, ''),
|
|
1: (ERROR, 'Power supply over voltage'),
|
|
2: (ERROR, 'Power supply under voltage'),
|
|
3: (ERROR, 'Output digital-to-analog Converter error'),
|
|
4: (ERROR, 'Current limit digital-to-analog converter error'),
|
|
5: (ERROR, 'Open heater load'),
|
|
6: (ERROR, 'Heater load less than 10 ohms')
|
|
}
|
|
_manual_output = 0.0 # TODO: check how to set this
|
|
vmax = 50
|
|
imax = 2
|
|
max_currents = {4 - i: 2 ** (1 - i) for i in range(4)}
|
|
SETPOINTLIMS = 1500.0
|
|
sorted_factors = sorted([(fhtr * fcur ** 2, (i, h))
|
|
for i, fcur in max_currents.items()
|
|
for h, fhtr in MainOutput.heater_ranges.items()
|
|
])
|
|
|
|
def get_status(self):
|
|
st = self.query(f'HTRST?', int)
|
|
return self.HTRST_MAP[st]
|
|
|
|
def configure(self):
|
|
icurrent, htr_range = self.get_best_power_idx(self._desired_max_power, 1.1)
|
|
self._power_scale = self.max_currents[icurrent] ** 2 * self.heater_ranges[htr_range] / 1e4
|
|
self.command(f'CLIMIT {self.output_no}', self.SETPOINTLIMS, 0.0, 0.0, icurrent, htr_range)
|
|
self._htr_range = htr_range
|
|
if self._control_loop is None:
|
|
mode = self.query(f'CMODE?{self.output_no}', int)
|
|
if mode != 3: # open loop
|
|
self.command(f'CSET {self.output_no}', '0', 1, 0, 0) # control off
|
|
self.command(f'CMODE {self.output_no}', 3) # open loop
|
|
self.command('RANGE', self._htr_range)
|
|
self.put_manual_power(self._manual_output)
|
|
else:
|
|
self.command(f'CSET {self.output_no}', self._control_loop.channel, 1, 1, 0) # control on
|
|
self.command(f'CMODE {self.output_no}', 1) # pid
|
|
self.command('RANGE', self._htr_range)
|
|
self.put_manual_power(self._control_loop.power_offset)
|
|
self.command(f'CDISP {self.output_no}', 1, self.resistance, 1, 0)
|
|
|
|
def fix_heater_range(self):
|
|
# switch heater range on, if needed
|
|
irng = self.query('RANGE?', int)
|
|
if irng != self._htr_range:
|
|
if irng:
|
|
self.log.info('output range was changed manually')
|
|
self._htr_range = irng
|
|
else:
|
|
self.log.info('output was off - switch on again')
|
|
self.command('RANGE', self._htr_range)
|
|
|
|
def read_max_power(self):
|
|
icurrent, htr_range = self.query(f'CLIMIT? {self.output_no}', float, float, float, int, int)[3:]
|
|
htr_range = self.query('RANGE?', int) or htr_range
|
|
self._htr_range = htr_range
|
|
self._power_scale = self.max_currents[icurrent] ** 2 * self.heater_ranges[htr_range] / 1e4
|
|
return self.calc_power(100)
|
|
|
|
def read_htr(self):
|
|
return self.query('HTR?', float)
|
|
|
|
|
|
class AnalogOutput340(AnalogOutput):
|
|
model = 340
|
|
output_no = Parameter(datatype=IntRange(2, 2), default=2)
|
|
|
|
def configure(self):
|
|
if self._control_loop is not None:
|
|
self.put_manual_power(self.target)
|
|
else:
|
|
self.command('ANALOG 2', 0, 3, self._control_loop.channel)
|
|
self.put_manual_power(self._control_loop.power_offset)
|
|
|
|
def put_manual_power(self, value):
|
|
if self._control_loop is None:
|
|
self.command('ANALOG 2', 0, 2, 0, 0, 0, 0, 0, self.calc_percent(value))
|
|
else:
|
|
self.command(f'MOUT {self.output_no}', self.calc_percent(value))
|
|
|
|
|
|
# --- MODELS 336, 350, 224, ...
|
|
class Device2(Device):
|
|
"""second generation LakeShore models"""
|
|
channels = 'ABCD'
|
|
user_curves = (21, 60) # the last curve is 59
|
|
max_raw_unit = 99999
|
|
TYPES = {'DT': 1, 'TG': 1, 'PT': 2, 'RF': 2, 'CX': 3, 'RX': 3, 'CC': 3, 'GE': 3, 'TC': 4}
|
|
|
|
|
|
# --- MODEL 336 ---
|
|
|
|
class IO336(IO):
|
|
model = 336
|
|
|
|
|
|
class Device336(Device2):
|
|
model = 336
|
|
|
|
|
|
class Sensor336(Sensor):
|
|
model = 336
|
|
|
|
|
|
class Loop336(Loop, Sensor336):
|
|
pass
|
|
|
|
|
|
class MainOutput336(MainOutput):
|
|
model = 336
|
|
output_no = Parameter(datatype=IntRange(1, 1))
|
|
imax = 2
|
|
vmax = 50
|
|
# 3 ranges only
|
|
heater_ranges = {3 - i: 10 ** -i for i in range(3)}
|
|
sorted_factors = sorted((v, i) for i, v in heater_ranges.items())
|
|
|
|
|
|
class SecondaryOutput336(MainOutput336):
|
|
model = 336
|
|
output_no = Parameter(datatype=IntRange(2, 2))
|
|
imax = 1.414
|
|
vmax = 35.4
|
|
max_power = Parameter(datatype=FloatRange(0, 50, unit='W'))
|
|
|
|
|
|
class AnalogOutput336(AnalogOutput):
|
|
model = 336
|
|
output_no = Parameter(datatype=IntRange(3, 4))
|
|
|
|
|
|
# --- MODEL 350 ---
|
|
|
|
class Device350(Device2):
|
|
model = 350
|
|
|
|
|
|
class Sensor350(Sensor):
|
|
model = 350
|
|
|
|
def get_curve_type(self, calcurve):
|
|
logformat, range_limit = super().get_curve_type(calcurve)
|
|
excit = 0
|
|
if self.intype[0] == 3 and calcurve.calibrange[0] > 0.2:
|
|
excit = 1 # TODO: add extra parameter for excitation
|
|
self.intype += (excit,)
|
|
return logformat, range_limit
|
|
|
|
|
|
class Loop350(Loop, Sensor350):
|
|
pass
|
|
|
|
|
|
class MainOutput350(Output):
|
|
model = 350
|
|
output_no = Parameter(datatype=IntRange(1, 1))
|
|
imax = 1.732
|
|
max_power = Parameter(datatype=FloatRange(0, 75, unit='W'))
|
|
|
|
|
|
class AnalogOutput350(AnalogOutput):
|
|
model = 350
|
|
output_no = Parameter(datatype=IntRange(3,4))
|
|
|
|
|
|
# --- MODEL 224 ---
|
|
|
|
class Device224(Device2):
|
|
model = 224
|
|
channels = 'A', 'B', 'C1', 'C2', 'C3', 'C4', 'C5', 'D1', 'D2', 'D3', 'D4', 'D5'
|
|
|
|
|
|
class Sensor224(Sensor):
|
|
model = 224
|