frappy/frappy_psi/lakeshore.py

1254 lines
47 KiB
Python

# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Oksana Shliakhtun <oksana.shliakhtun@psi.ch>
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""driver for various lakeshore temperature monitors/controllers"""
import time
import math
import random
import threading
import numpy as np
from numpy.testing import assert_approx_equal
from frappy.core import Module, Readable, Parameter, Property, \
HasIO, StringIO, Writable, IDLE, ERROR, BUSY, DISABLED, nopoll, Attached
from frappy.datatypes import IntRange, FloatRange, StringType, \
BoolType, ValueType, StatusType
from frappy.errors import CommunicationFailedError, ConfigError, \
HardwareError, DisabledError, ImpossibleError, secop_error, SECoPError
from frappy.lib.units import NumberWithUnit, format_with_unit
from frappy.lib import formatStatusBits
from frappy_psi.convergence import HasConvergence
from frappy.mixins import HasOutputModule, HasControlledBy
from frappy.extparams import StructParam
from frappy_psi.calcurve import CalCurve
def string_to_num(string):
try:
return int(string)
except ValueError:
return float(string)
class IO(StringIO):
"""IO classes of most LakeShore models will inherit from this"""
end_of_line = '\n'
wait_before = -1
# ValueType(str): accept and convert also numbers
model = Property('model name', ValueType(str), value='')
def initModule(self):
if self.wait_before < 0:
# wait before is not needed when connected via LAN
# as the port number is always 7777, the following is a quite
# good guess
self.wait_before = 0 if ':7777' in self.uri else 0.05
super().initModule()
def checkHWIdent(self):
self.identification = [('*IDN?', f'LSCI,MODEL{self.model},.*')]
super().checkHWIdent()
class HasLscIO(HasIO):
ioClass = IO
def query(self, msg, *converters):
"""send a query and parse result
:param msg: the query message, contains a '?'
:param converters: converters for the result values
:return: the converted value(s)
when more than one converter is given, this is a list
"""
response = self.communicate(msg).strip()
if not converters:
return response
values = sum((v.split(',') for v in response.split(';')), start=[])
if len(values) < len(converters):
raise CommunicationFailedError(
f'expected {len(converters)} values,'
f' got {values} as reply to {msg}')
result = [c(v.strip()) for (c, v) in zip(converters, values)]
return result[0] if len(converters) == 1 else result
def command(self, msghead, *args):
"""send a command to change parameters and query the replies
when no args are given, the command and a dummy request '*OPC?'
is sent in order to wait for completion.
:param msghead: the message, including the arguments needed
for the query (e.g. channel, output_no)
:param args: parameters to set.
the converters for the reply are taken from their types
:return: the converted readback value(s)
when more than one argument is given, this is a list
"""
if not args:
self.communicate(f'{msghead};*OPC?')
return None
# when an argument is given as integer, it might be that this argument might be a float
converters = [string_to_num if isinstance(a, int) else type(a) for a in args]
values = [a if isinstance(a, str) else f'{a:g}'
for c, a in zip(converters, args)]
if ' ' in msghead:
query = msghead.replace(' ', '? ', 1)
cmd = ','.join([msghead] + values)
else:
query = msghead + '?'
cmd = f"{msghead} {','.join(values)}"
return self.query(f'{cmd};{query}', *converters)
class Device(HasLscIO, Module):
"""common SECoP module for curve handling"""
ioClass = IO
status = Parameter('status for installing calib', StatusType(Readable, 'BUSY'))
# ValueType(str): accept and convert also numbers
model = Property('model name', ValueType(str), value='')
curve_handling = Property('is curve handling enabled', BoolType(), default=False)
curve_cache = None
channels = None # sequence of available channels
user_curves = None # range of user curves (tuple)
log_formats = True, False # tuple (<log Ohm allowed>, <log K allowed>)
max_curve_length = 200
cmds_per_line = 5 # number of commands / replies allowed on one line
_empty_curves = None
_request_lock = None
FAST_POLL = 0.01
def initModule(self):
self._empty_curves = {} # dict <curve no> of None (used as ordered set)
self._curve_map = {} # dict CurveHeader.key of curve_no
self._requests = {} # dict <calcurve> of CurveRequest
self._sensors = {} # dict <channel> of sensors
self._disable_channels = set(self.channels)
self._request_lock = threading.Lock()
self.io.setProperty('model', self.model)
super().initModule()
def curve_request(self, sensor):
"""register a sensor for checking or uploading the calibration curve
:param sensor: the sensor module
"""
with self._request_lock:
prev_calcurve = getattr(self._sensors.get(sensor.channel), 'calcurve', None)
self._sensors[sensor.channel] = sensor
if prev_calcurve in self._requests:
for ch, sens in self._sensors.items():
if prev_calcurve == sens.calcurve:
break
else:
self._requests.pop(prev_calcurve, None)
self.log.info('interrupt installing %s', prev_calcurve)
prev_request = self._requests.get(sensor.calcurve)
req = CurveRequest(sensor, self)
if prev_request:
if np.array_equal(prev_request.points, req.points):
# a request is already running and the curve content has not changed
self.log.info('already installing %s', sensor.calcurve)
req.add_sensor(sensor)
else:
self.log.info('file has changed, restart treating %s', sensor.calcurve)
self._requests[sensor.calcurve] = req
else:
self._requests[sensor.calcurve] = req
self._disable_channels.discard(sensor.channel)
self.setFastPoll(True, self.FAST_POLL)
def doPoll(self):
"""this is not really a poller, but a worker task
as loading and checking curves takes a while, this is done in the
background. does nothing after all curves are checked / loaded
and is reactivated only when curves are changed during runtime
"""
if not self.curve_handling or not self.io.is_connected:
return
if not self.curve_cache:
self.curve_cache = {}
headers = self.get_headers()
for curve_no in range(*self.user_curves):
crvhdr = CurveHeader(*headers[curve_no].split(','))
self.curve_cache[curve_no] = crvhdr
if crvhdr.key:
if crvhdr in self._curve_map:
# this is a duplicate, add to empty curves
self._empty_curves[curve_no] = None
else:
self._curve_map[crvhdr.key] = curve_no
try:
check_item = None
with self._request_lock:
for key, request in self._requests.items():
if request.loading:
break
if request.status[0] != ERROR:
check_item = key, request
else:
# no loading action found -> do check action
if check_item:
key, request = check_item
else:
request = None
if request:
request.action = request.action(request)
if request.action is not None:
return
self.finish_curve(self._requests.pop(key))
# no more requests pending
while self._disable_channels:
ch = self._disable_channels.pop()
sensor = self._sensors.get(ch)
if sensor is None or sensor.disabled:
self.disable_channel(ch)
except Exception as e:
request.loading = False
request.status = ERROR, repr(e)
# self._requests.pop(key)
raise
if not self._requests:
self.setFastPoll(False)
def read_status(self):
status = IDLE, ''
for req in self._requests.values():
if req.loading:
return req.status
status = req.status
return status
def get_empty(self):
"""get curve no of a curve to be reused"""
if self._empty_curves:
# we have unused curve slots
curve_no = next(iter(self._empty_curves))
self._empty_curves.pop(curve_no)
else:
used_no = set(s.curve_no for s in self._sensors.values())
for req in self._requests.values():
used_no.add(req.invalidate)
n0, n = self.user_curves
n -= n0
# avoid to take the lower numbers first
# as then the most recent curves would be overridden
offset = random.randrange(n)
for i in range(n):
curve_no = n0 + (i + offset) % n
if curve_no not in used_no:
break
else:
raise ValueError('no empty curves available')
return curve_no
def get_calib_state(self, sensor):
if not self.io.is_connected:
return ERROR, 'no connection'
req = self._requests.get(sensor.calcurve)
if req:
if req.loading:
return ImpossibleError(f'can not read T while {req.status[1]}')
if req.status[0] >= ERROR:
return ImpossibleError('error while loading calibration curve')
return None
def verify_ends(self, request):
"""preliminary check: check if the ends of the curve are matching the stored ones"""
npnt = len(request.points)
numbers = [1, npnt]
points = [request.points[0], request.points[npnt-1]]
if npnt < self.max_curve_length:
numbers.append(npnt + 1)
points.append((0, 0))
for pairs in zip(points, self.get_crvpts(request.curve_no, *numbers)):
if not self.is_equal(*pairs):
return False
return True
def find_curve(self, request):
"""try to find curve and return required action
:param request: the curve request
:return: next action
"""
no = self._curve_map.get(request.crvhdr.key)
if no:
request.crvhdr = self.curve_cache[no]
request.set_curve_no(no)
if self.verify_ends(request):
self.log.info('calcurve #%d %s found, start to check consistency', no, request.crvhdr)
# guess the curve is o.k. -> install
request.install_sensors(request.sensors.values())
request.pointer = 1
request.loading = False
request.status = IDLE, 'checking calcurve'
for sensor in request.sensors.values():
sensor.pollInfo.trigger()
return self.check_points
request.invalidate = no
request.set_curve_no(self.get_empty())
request.install_sensors(request.sensors.values())
self.log.info('%s found, but content has changed, create #%d',
request.crvhdr.sn, request.curve_no)
return self.start_load
request.set_curve_no(self.get_empty())
for sensor in request.sensors.values():
sensor.install_sensor()
self.log.info('load curve #%d %s', request.curve_no, request.crvhdr)
return self.start_load
def check_points(self, request):
"""check next point
:param request: the curve request
:return: next action
"""
if request.pointer >= len(request.points):
return None # finish
if request.new_sensors:
# install sensors for the same curve added in the meantime
sensors = request.new_sensors
request.new_sensors = set()
request.install_sensors(sensors)
given = request.points[request.pointer:request.pointer + self.cmds_per_line]
first = request.pointer + 1
no = request.curve_no
for n, (pt, ptg) in enumerate(zip(self.get_crvpts(no, *range(first, first + len(given))), given)):
if not self.is_equal(pt, ptg):
self.log.info('reply (%g, %g) does not match given point %d (%g,%g)',
*pt, first + n, *ptg)
request.invalidate = no
request.set_curve_no(self.get_empty())
self.log.info('%s has changed, create #%d', request.crvhdr.sn, request.curve_no)
request.loading = True
return self.start_load
request.pointer += self.cmds_per_line
return self.check_points
def start_load(self, request):
"""start loading a curve
:param request: the curve request
:return: next action
"""
request.status = BUSY, 'loading calcurve'
# delete first to make sure there is nothing left after the loaded points
self.command(f'CRVDEL {request.curve_no}')
# write a temporary header
# (in case loading will not finish, the curve should be marked as empty)
header = list(request.crvhdr)
self.put_header(request.curve_no, 'loading...', '', *header[2:])
request.pointer = 0
return self.load_points
def load_points(self, request):
"""load the next point(s)
:param request: the curve request
:return: next action
"""
try:
given = request.points[request.pointer:request.pointer+self.cmds_per_line]
first = request.pointer + 1
cmds = [f'CRVPT {request.curve_no},{first + n},{x:g},{y:g};'
f'CRVPT?{request.curve_no},{first + n}' for n, (x, y) in enumerate(given)]
for n, (reply, pt) in enumerate(zip(self.communicate(';'.join(cmds)).split(';'), given)):
if not self.is_equal([float(r) for r in reply.split(',')], pt):
# self.log.error('reply %r does not match given point %d (%g,%g)'
# '-> please check for values outside allowed range', reply, first + n, *pt)
raise HardwareError(f'reply {reply.strip()} does not match given point '
f'{first + n} {pt[0]:g},{pt[1]:g}'
'-> please check for values outside allowed range')
request.pointer += self.cmds_per_line
return None if request.pointer >= len(request.points) else self.load_points
except Exception as e:
self.log.exception('error in load_points %s', e)
raise
def finish_curve(self, request):
"""write header and assign to channel(s)
:param request: the curve request
:return: next action
"""
self.curve_cache[request.curve_no] = CurveHeader(*request.crvhdr)
self._curve_map[request.crvhdr.key] = request.curve_no
self.put_header(request.curve_no, *request.crvhdr)
request.install_sensors(request.sensors.values())
no = request.invalidate
comment = 'finished' if request.loading else 'verified'
if no:
self.log.info('calcurve #%d %s %s, clear previous #%d',
request.curve_no, request.crvhdr, comment, no)
self.curve_cache[no] = CurveHeader()
self._empty_curves[no] = None
else:
self.log.info('calcurve #%d %s %s',
request.curve_no, request.crvhdr, comment)
request.status = None
for sensor in request.sensors.values():
try:
sensor.pollInfo.trigger()
except Exception:
pass
def put_header(self, curve_no, name, sn, fmt, limit, coef):
"""write header"""
self.communicate(f'CRVHDR {curve_no},"{name}","{sn}",{fmt},{limit},{coef};*OPC?')
def is_equal(self, left, right, fixeps=(1.1e-5, 1.1e-5), significant=6):
"""check whether a returned calibration point is equal within curve point precision"""
for v1, v2, eps in zip(left, right, fixeps):
try:
assert_approx_equal(v1, v2, significant, verbose=False)
except AssertionError:
return abs(v1 - v2) < eps
return True
def get_crvpts(self, curve_no, *numbers):
"""read curve points
:param curve_no: curve number
:param numbers: indices of points to retrieve
:return: list of points (x,y)
"""
replies = []
for i in range(0, len(numbers), self.cmds_per_line):
cmds = [f'CRVPT?{curve_no},{n}' for n in numbers[i:i+self.cmds_per_line]]
replies.extend(self.communicate(';'.join(cmds)).split(';'))
return [[float(v) for v in xy.split(',')] for xy in replies]
def get_headers(self):
"""get all headers
for performance reasons, by default 5 headers are read in one line
this is speeing up quite a lot on serial connections
"""
n0, n1 = self.user_curves
result = {}
for ni in range(n0, n1, self.cmds_per_line):
cmd = ';'.join(f'CRVHDR?{n}' for n in range(ni, min(n1, ni + self.cmds_per_line)))
for i, reply in enumerate(self.communicate(cmd).split(';')):
result[ni + i] = reply
return result
def disable_channel(self, channel):
self.command(f'INTYPE {channel},0')
class CurveRequest:
invalidate = None
loading = True
status = BUSY, 'looking up calcurve'
def __init__(self, sensor, device):
self.action = device.find_curve
self.new_sensors = set()
self.sensors = {sensor.channel: sensor}
calcurve = CalCurve(sensor.calcurve)
equipment_id = device.propertyValues.get('original_id') or device.secNode.equipment_id
name = f"{equipment_id.split('.')[0]}.{sensor.name}"
sn = calcurve.calibname
limit = calcurve.calibrange[1]
unit = calcurve.options.get('unit', 'Ohm')
logformat, range_limit = sensor.get_curve_type(calcurve)
self.points = calcurve.export(logformat, device.max_curve_length)
if range_limit:
x0, x1 = range_limit
if x0 > calcurve.xrange[0] or calcurve.xrange[1] > x0:
# we do not limit in the first place already, because of the following warning
if calcurve.xrange[1] > x1:
device.log.warn('curve %s cut at %g %s (upper end was %g %s)',
sn, x1, unit, calcurve.xrange[1], unit)
if x0 > calcurve.xrange[0]:
device.log.warn('curve %s cut at %g %s (lower end was %g %s)',
sn, x0, unit, calcurve.xrange[0], unit)
self.points = calcurve.export(logformat, device.max_curve_length, xlimits=range_limit)
ymax = max(self.points[0, 1], self.points[-1, 1])
limit = min(limit, 10 ** ymax if calcurve.logformat[1] else ymax)
device.log.info('exported %d points from %s', len(self.points), calcurve.filename)
if unit == 'Ohm':
if calcurve.logformat[1]:
fmt = 5
elif calcurve.logformat[0]:
fmt = 4
else:
fmt = 3
else:
fmt = 2 if unit == 'V' else 1
coef = 1 + (self.points[0][1] < self.points[-1][1]) # 1: ntc, 2: ptc
self.crvhdr = CurveHeader(name, sn, fmt, limit, coef)
def set_curve_no(self, curve_no):
self.curve_no = curve_no
for s in self.sensors.values():
s.curve_no = curve_no
def add_sensor(self, sensor):
self.sensors[sensor.channel] = sensor
self.new_sensors.add(sensor)
def install_sensors(self, sensors=None):
for sensor in sensors:
sensor.install_sensor()
sensor.install_curve()
class CurveHeader(tuple):
"""curve header class
carries metadata of curves
"""
def __new__(cls, name='', sn='', fmt=0, limit=0, coef=0):
"""initialize curve header. convert from string if needed
:param name: a name
:param sn: the serial 'number' - is a string as it contains letters
:param fmt: 1..5 (mV,K), (V,K), (Ohm,K), (log Ohm, K), (log Ohm, log K)
:param limit: setpoint limit (K)
:param coef: 1, 2 (negative, positive)
"""
return tuple.__new__(cls, (name.strip(), sn.strip(), int(fmt), float(limit), int(coef)))
@property
def name(self):
return self[0]
@property
def sn(self):
return self[1]
@property
def fmt(self):
return self[2]
@property
def limit(self):
return self[3]
@property
def coef(self):
return self[4]
@property
def key(self):
"""key for curve lookup"""
if self.sn:
return self.sn.lower(), self.fmt, self.coef
return None
def get_cfg_value(cfgdict, key, default=object):
# get value from a cfgdict
param = cfgdict.get(key)
try:
return param['value']
except TypeError:
# param is not a dict
return param
except KeyError:
if type(param).__name__ != 'Param':
return param
if default is object:
raise KeyError(f'cfg dict does not contain {key!r}')
return default
class Base(HasLscIO):
model = None
device = Attached(Device, mandatory=False) # device is not needed without curve handling
def initModule(self):
if self.device and not self.io:
self.io = self.device.io.name
super().initModule()
class Sensor(Base, Readable):
"""base class for sensors"""
value = Parameter(unit='K')
status = Parameter(datatype=StatusType(Readable, 'DISABLED', 'BUSY'))
raw = Parameter('raw sensor value', datatype=FloatRange(unit='Ohm'), default=0)
channel = Property('used channel', StringType()) # input
calcurve = Parameter('calibration curve name', StringType(), readonly=False)
enabled = Parameter('enable flag', BoolType(), readonly=False)
classes = {} # dict <model> of class
curve_no = None
STATUS_BIT_LABELS = 'invalid_reading old_reading b2 b3 t_under t_over units_zero units_overrange'.split()
_read_error = None
_raw_error = None
_do_read = True
_curve_handling = False # True when a device is present and device.curve_handling is True
# TODO: implement alarms
def initModule(self):
super().initModule()
if self.device:
self._curve_handling = self.device.curve_handling
if not self._curve_handling:
self.parameters['calcurve'].setProperty('export', False)
def doPoll(self):
self.read_status() # polls also value and raw
@nopoll
def read_value(self):
self.status, value, self.raw = self.get_data()
if isinstance(value, SECoPError):
raise value.copy()
return value
@nopoll
def read_raw(self):
self.status, self.value, raw = self.get_data()
if isinstance(raw, SECoPError):
raise raw.copy()
return raw
def read_status(self):
try:
status, self.value, self.raw = self.get_data()
except Exception as e:
self.raw = self.value = secop_error(e)
raise
return status
def get_data(self):
"""get reading status, raw and Kelvin value with minimal delay"""
status = IDLE, ''
if self.enabled:
ch = self.channel
rdgst, raw, value = self.query(f'RDGST?{ch};SRDG?{ch};KRDG?{ch}', int, float, float)
rdgst &= 0xfd # suppress old reading
if rdgst:
statuslist = formatStatusBits(rdgst, self.STATUS_BIT_LABELS)
status = ERROR, statuslist[-1] # show only the most fatal error
value = HardwareError(statuslist[-1])
while statuslist and statuslist[-1].startswith('t_'):
statuslist.pop()
if statuslist:
raw = HardwareError(statuslist[-1])
elif self._curve_handling:
value_error = self.device.get_calib_state(self)
if value_error:
value = value_error
status = ERROR, str(value_error)
else:
raw = value = DisabledError('disabled')
status = DISABLED, 'disabled'
self.enable = False
return status, value, raw
def write_calcurve(self, calibname):
if not self._curve_handling:
raise ConfigError('curve handling is off')
self.enabled = True
self.device.curve_request(self)
def write_enabled(self, value):
if self._curve_handling:
if value:
self.write_calcurve(self.calcurve)
else:
self.device.disable_channel(self.channel)
def read_enabled(self):
if not self.query(f'INTYPE?{self.channel}', int):
return False
return self.enabled
def get_curve_type(self, calcurve):
unit = calcurve.options.get('unit', 'Ohm')
logformat = False
range_limit = (1e-5, 9999.99)
if unit == 'Ohm':
if calcurve.ptc:
# <sensor type>,<autorange>,<range>,<compensation>,<units>
self.intype = 2, 1, 6, 1, 1 # PTC
else:
range_limit = (1e-5, 99999.9)
logformat = True, False
self.intype = 3, 1, 8, 1, 1 # NTC
elif unit == 'V': # diode
self.intype = (1, 1, 0, 0, 1) if calcurve.xscale[1] <= 2.5 else (1, 1, 1, 0, 1)
else: # thermocouple
self.intype = (4, 0, 0, 1, 1)
return logformat, range_limit
def install_sensor(self):
if self.query(f'INTYPE?{self.channel}', *(type(v) for v in self.intype)) != self.intype:
self.command(f'INTYPE {self.channel}', *self.intype)
def install_curve(self):
if self.query(f'INCRV?{self.channel}', int) != self.curve_no:
self.command(f'INCRV {self.channel}', self.curve_no)
class Output(Base, HasControlledBy, Writable):
"""base class for heater output"""
classes = {} # dict <model> of dict <output_no> of class
value = Parameter('heater output', FloatRange(0, 100, unit='W'))
target = Parameter('manual heater output', FloatRange(0, 100, unit='W'), default=0)
htr = Parameter('heater output current percentage', FloatRange(0, 100, unit='%'))
max_heater = Parameter('desired max. heater with unit W, A or V', StringType(), readonly=False)
max_power = Parameter('''max heater power\n
value will be clamped to allowed range and may be rounded to discrete values
''', FloatRange(0, 100, unit='W'), readonly=False)
# for the case when several outputs are possible:
output_no = Parameter('lakeshore output or loop number', IntRange(1, 4), default=1)
resistance = Parameter('heater resistance', FloatRange(10, 100), readonly=False, default=25)
Extension = None
AMP_VOLT_WATT = NumberWithUnit('A', 'V', 'W')
imax = None
vmax = None
n_ranges = None
n_currents = None
_power_scale = 1e-4
# sorted_factors: list of possible current squares (I^2, idx)
# (multiply I^2 with resistance to get power)
sorted_factors = None
errorstatus = None
_desired_max_power = None
_control_loop = None # a loop module object when controlled, None when in manual mode
power_offset = 0 # offset for closed_loop
def configure(self):
"""configure the output
based on self._control_loop, self.resistance and self._desired_max_power
"""
raise NotImplementedError
def set_closed_loop(self, loop):
"""set to control mode
:param loop: the temperature loop module
"""
if self._control_loop != loop:
self.log.info(f'set output to be controlled on channel {loop.channel}')
self._control_loop = loop
self.configure()
else:
self.fix_heater_range()
def set_open_loop(self):
"""set to open loop"""
if self._control_loop is not None:
self.log.info('put output into manual mode')
self._control_loop = None
self.configure()
else:
self.fix_heater_range()
def fix_heater_range(self):
"""switch on heater range, if off"""
def write_output_no(self, output_no):
if self._desired_max_power is not None:
self.output_no = output_no # early update needed by configure()
self.configure()
def write_resistance(self, resistance):
if self._desired_max_power is not None:
self.resistance = resistance # early update needed by configure()
self.configure()
def get_best_power_idx(self, max_power, round_down_limit=1.0001):
"""get best index from sorted_factors to match given max_current"""
prev_pwr = 0
prev_idx = None
for fact, idx in self.sorted_factors:
pwr = min(fact * self.resistance, self.vmax ** 2 / self.resistance)
if pwr >= max_power:
if pwr == prev_pwr:
# happens when we reached voltage compliance limit
return prev_idx
if max_power - prev_pwr < pwr - max_power and max_power < prev_pwr * round_down_limit:
# previous value is closer and not above round down limit
return prev_idx
return idx
prev_idx = idx
return prev_idx
def calc_power(self, percent):
# power is limited by voltage compliance
return min(percent ** 2 * self._power_scale * self.resistance,
self.vmax ** 2 / self.resistance)
def calc_percent(self, power):
return min(100., math.sqrt(power / self.resistance / self._power_scale))
def read_status(self):
"""get heater status"""
return IDLE, ''
def put_manual_power(self, value):
self.command(f'MOUT {self.output_no}', value)
def read_value(self):
return self.calc_power(self.read_htr())
def read_htr(self):
"""read heater percent"""
raise NotImplementedError
def write_max_power(self, max_power):
self._desired_max_power = max_power
self.configure()
return self.calc_power(100)
def write_max_heater(self, value):
number, unit = self.AMP_VOLT_WATT.parse(value)
pmax = self['target'].datatype.max
if unit == 'A':
power = self.write_max_power(min(pmax, number ** 2 * self.resistance))
number = math.sqrt(power / self.resistance)
elif unit == 'V':
power = self.write_max_power(min(pmax, number ** 2 / self.resistance))
number = math.sqrt(power * self.resistance)
else:
number = self.write_max_power(min(pmax, number))
return format_with_unit(number, unit, 3)
def write_target(self, target):
self.self_controlled()
self.put_manual_power(target)
class MainOutput(Output):
"""power output with adjustable power
including common code for main output(s) on 336 / 350
"""
HTRST_MAP = {
0: (IDLE, ''),
1: (ERROR, 'Open heater load'),
2: (ERROR, 'Heater short')
}
_htr_range = 1
heater_ranges = {5 - i: 10 ** -i for i in range(5)}
sorted_factors = sorted((v, i) for i, v in heater_ranges.items())
def read_status(self):
st = self.query(f'HTRST? {self.output_no}', int)
return self.HTRST_MAP[st]
def configure(self):
if self._desired_max_power is None:
self.log.info(f'max_heater {self.writeDict} {self.max_heater}')
self.write_max_heater(self.max_heater)
self._htr_range = irng = self.get_best_power_idx(self._desired_max_power)
user_current = max(0.1, min(self.imax, 2 * math.sqrt(self._desired_max_power /
self.heater_ranges[irng] / self.resistance)))
self._power_scale = user_current ** 2 * self.heater_ranges[irng] / 1e4
self.command(f'HTRSET {self.output_no}', 1 if self.resistance < 50 else 2, 0, user_current, 1)
# self.command(f'CDISP {self.output_no}', 1, self.resistance, 1, 0)
if self._control_loop is None:
mode = self.query(f'OUTMODE?{self.output_no}', int)
if mode != 3: # open loop
self.command(f'OUTMODE {self.output_no}', 3) # control off
# self.command(f'OUTMODE {self.output_no}', 3, self.channel, 0) # control off
self.command(f'RANGE {self.output_no}', self._htr_range)
self.put_manual_power(self.target)
else:
self.command(f'OUTMODE {self.output_no}', 1, self._control_loop.channel, 0) # control on
self.command(f'RANGE {self.output_no}', self._htr_range)
self.put_manual_power(self._control_loop.power_offset)
def read_max_power(self):
curidx, self._user_current = self.query(f'HTRSET? {self.output_no}', int, int, float)[1:3]
if not self._user_current:
self._user_current = 2.0 ** (curidx * 0.5 - 1)
self._htr_range = self.query(f'RANGE?{self.output_no}', int) or self._htr_range
self._power_scale = self.imax ** 2 * self.heater_ranges[self._htr_range] / 1e4
return self.calc_power(100)
def fix_heater_range(self):
# switch heater range on, if needed
irng = self.query(f'RANGE?{self.output_no}', int)
if irng != self._htr_range:
if irng:
self.log.info('output range was changed manually')
self._htr_range = irng
else:
self.log.info('output was off - switch on again')
self.command(f'RANGE {self.output_no}', self._htr_range)
def read_htr(self):
return self.query(f'HTR? {self.output_no}', float)
class AnalogOutput(Output):
"""low power (max. 1W) output"""
output_no = Parameter(datatype=IntRange(3, 4), default=3)
resistance = Parameter(default=100)
max_power = Parameter(readonly=True)
max_heater = Parameter(readonly=True)
vmax = 10
imax = 0.1
def read_max_power(self):
# max power can not be changed
# -> the return value depends on the resistance only
self._power_scale = self.vmax ** 2 / 1e4
return self.calc_power(100)
def write_max_power(self, value):
# max power is not configurable
return self.read_max_power()
def write_output_no(self, output_no):
self.output_no = output_no
self.configure()
def calc_power(self, percent):
# power is limited by max current
return min(percent ** 2 * self._power_scale / self.resistance,
self.imax ** 2 / self.resistance)
def configure(self):
if self._control_loop is None:
# 3: open loop, 0: powerup enable off
self.command(f'OUTMODE {self.output_no}', 3, 0, 0)
self.put_manual_power(self.target)
else:
# 1: closed loop, 0: powerup enable off
self.command(f'OUTMODE {self.output_no}', 1, self._control_loop.channel, 0)
self.put_manual_power(self._control_loop.power_offset)
def read_htr(self):
return self.query(f'AOUT?{self.output_no}', float)
class Loop(HasConvergence, HasOutputModule, Sensor):
output_module = Attached(Output)
# this creates individual parameters pid_p, pid_i and pid_d automatically
ctrlpars = StructParam('ctrlpars struct', {
'p': Parameter('control parameter p', FloatRange(0, 1000)),
'i': Parameter('control parameter i', FloatRange(0, 1000)),
'd': Parameter('control parameter d', FloatRange(0, 1000)),
}, prefix='pid_', readonly=False)
power_offset = Parameter('power offset\n\n(using LakeShores Manual Output)',
FloatRange(0, 100, unit='W'), readonly=False, default=0)
classes = {}
def __init_subclass__(cls):
super().__init_subclass__()
if cls.model: # do not register abstract base classes
cls.model = str(cls.model)
cls.classes[cls.model] = cls
def write_ctrlpars(self, pid):
pid = self.command(f'PID {self.output_module.output_no}', *[pid[k] for k in 'pid'])
return dict(zip('pid', pid))
def read_ctrlpars(self):
pid = self.query(f'PID?{self.output_module.output_no}', float, float, float)
return dict(zip('pid', pid))
def write_target(self, value):
sensor_status = Sensor.read_status(self)
if sensor_status[0] >= BUSY:
raise ImpossibleError(f'can not control while status is {sensor_status}')
self.activate_control()
self.output_module.set_closed_loop(self)
self.set_target(value)
def write_power_offset(self, value):
if self.control_active:
self.output_module.put_manual_power(value)
def read_target(self):
return self.get_target()
def set_target(self, value):
return self.command(f'SETP {self.output_module.output_no}', float(value))
def get_target(self):
return self.query(f'SETP?{self.output_module.output_no}', float)
# --- MODEL 340 ---
class IO340(IO):
timeout = 5 # needed for INCRV command
model = 340
end_of_line = '\r' # default at SINQ. TODO: remove
class Device340(Device):
ioClass = IO340
model = 340
channels = 'ABCD'
user_curves = (21, 61) # the last curve is 60
log_formats = True, True # log Ohm / log K is supported
_crvsav_deadline = None
def disable_channel(self, channel):
self.communicate(f'INSET {channel},0;*OPC?')
def finish_curve(self, request):
super().finish_curve(request)
if request.loading and not self._crvsav_deadline:
# when loading a new curve, remember for sending CRVSAV later
self._crvsav_deadline = time.time() + 600
def put_header(self, curve_no, name, sn, fmt, limit, coef):
self.communicate(f'CRVHDR {curve_no},{name:15s},{sn:10s},{fmt},{limit},{coef};*OPC?')
def doPoll(self):
super().doPoll()
if self._crvsav_deadline:
# prevent flashing multiple times within short time
if time.time() > self._crvsav_deadline:
self._crvsav_deadline = None
self.communicate('CRVSAV;*OPC?')
def is_equal(self, left, right, fixeps=(1.1e-5, 1.1e-4), significant=6):
# for whatever reason, the number of digits after decimal point for the T column is only 4
return super().is_equal(left, right, fixeps, significant)
class Sensor340(Sensor):
model = 340
intype = None # arguments for the intype command
def get_curve_type(self, calcurve):
unit = calcurve.options.get('unit', 'Ohm')
logformat = False
range_limit = None
if unit == 'Ohm':
if calcurve.ptc:
xlim = calcurve.xrange[1] * 0.001 # 1 mA excitation
for rng, limit in enumerate(
[0, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05,
0.1, 0.25, 0.5, 1, 2.5, 5]):
if xlim <= limit:
break
else:
rng = 13
# we use special type here, in order to allow thermal compensation
# <type>, <units>, <coefficient>, <excitation>, <range>
self.intype = 0, 2, 2, 10, rng
else:
if calcurve.options.get('type') == 'GE':
self.intype = (10,)
else:
self.intype = (8,) # carbon and ruox are equivalent to cernox(8)
logformat = True, True
elif unit == 'V': # diode
self.intype = (1,) if calcurve.xscale[1] < 2.5 else (2,)
else: # thermocouple
self.intype = (12,)
return logformat, range_limit
def install_sensor(self):
super().install_sensor()
if self.query(f'INSET?{self.channel}', int, int) != (1, 1):
self.command(f'INSET {self.channel}', 1, 1)
class Loop340(Loop, Sensor340):
pass
class MainOutput340(MainOutput):
model = 340
output_no = Parameter(datatype=IntRange(1, 1))
HTRST_MAP = {
0: (IDLE, ''),
1: (ERROR, 'Power supply over voltage'),
2: (ERROR, 'Power supply under voltage'),
3: (ERROR, 'Output digital-to-analog Converter error'),
4: (ERROR, 'Current limit digital-to-analog converter error'),
5: (ERROR, 'Open heater load'),
6: (ERROR, 'Heater load less than 10 ohms')
}
_manual_output = 0.0 # TODO: check how to set this
vmax = 50
imax = 2
max_currents = {4 - i: 2 ** (1 - i) for i in range(4)}
SETPOINTLIMS = 1500.0
sorted_factors = sorted([(fhtr * fcur ** 2, (i, h))
for i, fcur in max_currents.items()
for h, fhtr in MainOutput.heater_ranges.items()
])
def get_status(self):
st = self.query(f'HTRST?', int)
return self.HTRST_MAP[st]
def configure(self):
icurrent, htr_range = self.get_best_power_idx(self._desired_max_power, 1.1)
self._power_scale = self.max_currents[icurrent] ** 2 * self.heater_ranges[htr_range] / 1e4
self.command(f'CLIMIT {self.output_no}', self.SETPOINTLIMS, 0.0, 0.0, icurrent, htr_range)
self._htr_range = htr_range
if self._control_loop is None:
mode = self.query(f'CMODE?{self.output_no}', int)
if mode != 3: # open loop
self.command(f'CSET {self.output_no}', '0', 1, 0, 0) # control off
self.command(f'CMODE {self.output_no}', 3) # open loop
self.command('RANGE', self._htr_range)
self.put_manual_power(self._manual_output)
else:
self.command(f'CSET {self.output_no}', self._control_loop.channel, 1, 1, 0) # control on
self.command(f'CMODE {self.output_no}', 1) # pid
self.command('RANGE', self._htr_range)
self.put_manual_power(self._control_loop.power_offset)
self.command(f'CDISP {self.output_no}', 1, self.resistance, 1, 0)
def fix_heater_range(self):
# switch heater range on, if needed
irng = self.query('RANGE?', int)
if irng != self._htr_range:
if irng:
self.log.info('output range was changed manually')
self._htr_range = irng
else:
self.log.info('output was off - switch on again')
self.command('RANGE', self._htr_range)
def read_max_power(self):
icurrent, htr_range = self.query(f'CLIMIT? {self.output_no}', float, float, float, int, int)[3:]
htr_range = self.query('RANGE?', int) or htr_range
self._htr_range = htr_range
self._power_scale = self.max_currents[icurrent] ** 2 * self.heater_ranges[htr_range] / 1e4
return self.calc_power(100)
def read_htr(self):
return self.query('HTR?', float)
class AnalogOutput340(AnalogOutput):
model = 340
output_no = Parameter(datatype=IntRange(2, 2), default=2)
def configure(self):
if self._control_loop is not None:
self.put_manual_power(self.target)
else:
self.command('ANALOG 2', 0, 3, self._control_loop.channel)
self.put_manual_power(self._control_loop.power_offset)
def put_manual_power(self, value):
if self._control_loop is None:
self.command('ANALOG 2', 0, 2, 0, 0, 0, 0, 0, self.calc_percent(value))
else:
self.command(f'MOUT {self.output_no}', self.calc_percent(value))
# --- MODELS 336, 350, 224, ...
class Device2(Device):
"""second generation LakeShore models"""
channels = 'ABCD'
user_curves = (21, 60) # the last curve is 59
max_raw_unit = 99999
TYPES = {'DT': 1, 'TG': 1, 'PT': 2, 'RF': 2, 'CX': 3, 'RX': 3, 'CC': 3, 'GE': 3, 'TC': 4}
# --- MODEL 336 ---
class IO336(IO):
model = 336
class Device336(Device2):
model = 336
class Sensor336(Sensor):
model = 336
class Loop336(Loop, Sensor336):
pass
class MainOutput336(MainOutput):
model = 336
output_no = Parameter(datatype=IntRange(1, 1))
imax = 2
vmax = 50
# 3 ranges only
heater_ranges = {3 - i: 10 ** -i for i in range(3)}
sorted_factors = sorted((v, i) for i, v in heater_ranges.items())
class SecondaryOutput336(MainOutput336):
model = 336
output_no = Parameter(datatype=IntRange(2, 2))
imax = 1.414
vmax = 35.4
max_power = Parameter(datatype=FloatRange(0, 50, unit='W'))
class AnalogOutput336(AnalogOutput):
model = 336
output_no = Parameter(datatype=IntRange(3, 4))
# --- MODEL 350 ---
class Device350(Device2):
model = 350
class Sensor350(Sensor):
model = 350
def get_curve_type(self, calcurve):
logformat, range_limit = super().get_curve_type(calcurve)
excit = 0
if self.intype[0] == 3 and calcurve.calibrange[0] > 0.2:
excit = 1 # TODO: add extra parameter for excitation
self.intype += (excit,)
return logformat, range_limit
class Loop350(Loop, Sensor350):
pass
class MainOutput350(Output):
model = 350
output_no = Parameter(datatype=IntRange(1, 1))
imax = 1.732
max_power = Parameter(datatype=FloatRange(0, 75, unit='W'))
class AnalogOutput350(AnalogOutput):
model = 350
output_no = Parameter(datatype=IntRange(3,4))
# --- MODEL 224 ---
class Device224(Device2):
model = 224
channels = 'A', 'B', 'C1', 'C2', 'C3', 'C4', 'C5', 'D1', 'D2', 'D3', 'D4', 'D5'
class Sensor224(Sensor):
model = 224