frappy/frappy_psi/ls372.py
l_samenv 5b0da3ba98 fixes on 2023-11-27
- ls372 autorange: wait one sec. more for switching
- keep only one channel, even after target is reached
- intermediate target only when T is raise, but not when lowered
2024-02-23 10:13:05 +01:00

450 lines
18 KiB
Python

# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""LakeShore Model 372 resistance channel
implements autoscan and autorange by software.
when the autoscan or autorange button is pressed, the state is toggled,
and the hardware mode switched off again.
At startup, the configurable default mode is set, independent of
the hardware state.
"""
import time
import frappy.io
from frappy.datatypes import BoolType, EnumType, FloatRange, IntRange
from frappy.lib import formatStatusBits
from frappy.core import Command, Drivable, Parameter, Property, CommonReadHandler, CommonWriteHandler
from frappy_psi.convergence import HasConvergence
from frappy_psi.channelswitcher import Channel, ChannelSwitcher
Status = Drivable.Status
STATUS_BIT_LABELS = 'CS_OVL VCM_OVL VMIX_OVL VDIF_OVL R_OVER R_UNDER T_OVER T_UNDER'.split()
def parse1(string):
try:
return int(string)
except ValueError:
pass
try:
return float(string)
except ValueError:
return string
def parse(reply):
return tuple(parse1(s) for s in reply.split(','))
class StringIO(frappy.io.StringIO):
identification = [('*IDN?', 'LSCI,MODEL372,.*')]
wait_before = 0.05
class Switcher(frappy.io.HasIO, ChannelSwitcher):
value = Parameter(datatype=IntRange(1, 16))
target = Parameter(datatype=IntRange(1, 16))
use_common_delays = Parameter('use switch_delay and measure_delay instead of the channels pause and dwell',
BoolType(), readonly=False, default=False)
common_pause = Parameter('pause with common delays', FloatRange(3, 200, unit='s'), readonly=False, default=3)
ioClass = StringIO
fast_poll = 1
_measure_delay = None
_switch_delay = None
def initialReads(self):
# disable unused channels
display = []
for ch in range(1, 16):
chan = self.channels.get(ch)
if chan:
if hasattr(chan, 'raw'):
display.append((ch, 2))
display.append((ch, 1))
else:
self.communicate('INSET %d,0,0,0,0,0;INSET?%d' % (ch, ch))
if len(display) > 4:
self.communicate('DISPLAY 2,2,1;*OPC?')
else:
self.communicate('DISPLAY 2,1,1;*OPC?')
for i, (ch, unit) in enumerate(display):
self.communicate(f'DISPFLD {i+1},{ch},{unit};*OPC?')
channelno, autoscan = parse(self.communicate('SCAN?'))
if channelno in self.channels and self.channels[channelno].enabled:
if not autoscan:
return # nothing to do
else:
channelno = self.next_channel(channelno)
if channelno is None:
self.status = 'ERROR', 'no enabled channel'
return
self.set_active_channel(self.channels[channelno])
def doPoll(self):
"""poll buttons
and check autorange during filter time
"""
super().doPoll()
self.channels[self.target]._read_value() # check range or read
channelno, autoscan = parse(self.communicate('SCAN?'))
if autoscan:
# pressed autoscan button: switch off HW autoscan and toggle soft autoscan
self.autoscan = not self.autoscan
self.communicate('SCAN %d,0;SCAN?' % self.value)
if channelno != self.value:
# channel changed by keyboard, do not yet return new channel
self.log.info('channel changed by keyboard %d!', channelno)
self.write_target(channelno)
chan = self.channels.get(channelno)
if chan is None:
self.log.info('invalid channel %d!', channelno)
channelno = self.next_channel(channelno)
if channelno is None:
raise ValueError('no channels enabled')
self.write_target(channelno)
chan = self.channels.get(self.value)
chan.read_autorange()
chan.fix_autorange() # check for toggled autorange button
def write_switch_delay(self, value):
self._switch_delay = value
return super().write_switch_delay(value)
def write_measure_delay(self, value):
self._measure_delay = value
return super().write_measure_delay(value)
def write_use_common_delays(self, value):
if value:
# use values from a previous change, instead of
# the values from the current channel
if self._measure_delay is not None:
self.measure_delay = self._measure_delay
if self._switch_delay is not None:
self.switch_delay = self._switch_delay
return value
def set_delays(self, chan):
if self.use_common_delays:
if chan.dwell != self.measure_delay:
chan.write_dwell(self.measure_delay)
if chan.pause != self.common_pause:
chan.write_pause(self.common_pause)
filter_ = max(0, self.switch_delay - self.common_pause)
if chan.filter != filter_:
chan.write_filter(filter_)
else:
# switch_delay and measure_delay is changing with channel
self.switch_delay = chan.pause + chan.filter
self.measure_delay = chan.dwell
def set_active_channel(self, chan):
channelno = parse(self.communicate('SCAN %d,0;SCAN?' % chan.channel))[0]
self.value = channelno
chan._last_range_change = time.monotonic()
self.set_delays(chan)
class ResChannel(Channel):
"""temperature channel on Lakeshore 372"""
RES_RANGE = {key: i+1 for i, key in list(
enumerate(mag % val for mag in ['%gmOhm', '%gOhm', '%gkOhm', '%gMOhm']
for val in [2, 6.32, 20, 63.2, 200, 632]))[:-2]}
CUR_RANGE = {key: i + 1 for i, key in list(
enumerate(mag % val for mag in ['%gpA', '%gnA', '%guA', '%gmA']
for val in [1, 3.16, 10, 31.6, 100, 316]))[:-2]}
VOLT_RANGE = {key: i + 1 for i, key in list(
enumerate(mag % val for mag in ['%guV', '%gmV']
for val in [2, 6.32, 20, 63.2, 200, 632]))}
RES_SCALE = [2 * 10 ** (0.5 * i) for i in range(-7, 16)] # RES_SCALE[0] is not used
MAX_RNG = len(RES_SCALE) - 2 # was - 1
channel = Property('the Lakeshore channel', datatype=IntRange(1, 16), export=False)
value = Parameter(datatype=FloatRange(unit='Ohm'))
pollinterval = Parameter(visibility=3, default=1)
range = Parameter('reading range', readonly=False,
datatype=EnumType(**RES_RANGE))
minrange = Parameter('minimum range for software autorange', readonly=False, default=1,
datatype=EnumType(**RES_RANGE))
autorange = Parameter('autorange', datatype=BoolType(),
readonly=False, default=1)
iexc = Parameter('current excitation', datatype=EnumType(off=0, **CUR_RANGE), readonly=False)
vexc = Parameter('voltage excitation', datatype=EnumType(off=0, **VOLT_RANGE), readonly=False)
enabled = Parameter('is this channel enabled?', datatype=BoolType(), readonly=False)
pause = Parameter('pause after channel change', datatype=FloatRange(3, 60, unit='s'), readonly=False)
dwell = Parameter('dwell time with autoscan', datatype=FloatRange(1, 200, unit='s'), readonly=False)
filter = Parameter('filter time', datatype=FloatRange(1, 200, unit='s'), readonly=False)
_toggle_autorange = 'init'
_prev_rdgrng = (1, 1) # last read values for icur and exc
_last_range_change = 0
_value = None # if not None, a fresh value
rdgrng_params = 'range', 'iexc', 'vexc'
inset_params = 'enabled', 'pause', 'dwell'
STATUS_MASK = 0x3f # mask T_OVER and T_UNDER
def communicate(self, command):
# TODO: remove this and change to query
return self.switcher.communicate(command)
def query(self, command):
return parse(self.switcher.communicate(command))
def change(self, command, *args):
cmd, _, qarg = command.partition(' ')
args = ','.join([qarg] + [f'{a:g}' for a in args])
return parse(self.switcher.communicate(f'{command}?{qarg};{command} {args}'))
def read_status(self):
if not self.enabled:
return [self.Status.DISABLED, 'disabled']
if self.switcher.isBusy():
return self.status
result = int(self.communicate('RDGST?%d' % self.channel)) & self.STATUS_MASK
statustext = ' '.join(formatStatusBits(result, STATUS_BIT_LABELS))
if statustext:
return [self.Status.ERROR, statustext]
return [self.Status.IDLE, '']
def _read_value(self):
"""read value, without update"""
now = time.monotonic()
if now - 0.5 < max(self._last_range_change, self.switcher._start_switch) + self.pause:
return None
result = float(self.communicate('RDGR?%d' % self.channel))
if result == 0:
if self.autorange:
rng = int(max(self.minrange, self.range)) # convert from enum to int
self.write_range(min(self.MAX_RNG, rng + 1))
return None
if self.autorange:
self.fix_autorange()
if now - 0.5 > self._last_range_change + self.pause:
rng = int(max(self.minrange, self.range)) # convert from enum to int
if self.status[0] < self.Status.ERROR:
if abs(result) > self.RES_SCALE[rng]:
if rng < 22:
rng += 1
else:
lim = 0.2
while rng > self.minrange and abs(result) < lim * self.RES_SCALE[rng]:
rng -= 1
lim -= 0.05 # not more than 4 steps at once
# effectively: <0.16 %: 4 steps, <1%: 3 steps, <5%: 2 steps, <20%: 1 step
elif rng < self.MAX_RNG:
self.log.debug('increase range due to error %d', rng)
rng = min(self.MAX_RNG, rng + 1)
if rng != self.range:
self.log.debug('range change to %d', rng)
self.write_range(rng)
self._last_range_change = now
return result
def read_value(self):
if self.switcher.isBusy():
return self.value if self.enabled else 0
value = self._read_value() if self._value is None else self._value
self._value = None
return self.value if value is None else value
def is_switching(self, now, last_switch, switch_delay):
result = super().is_switching(now, last_switch, switch_delay)
if not result: # the time since last switch has passed
self._value = self._read_value() # for autorange
# now check for the time since last range change
result = super().is_switching(now, self._last_range_change, switch_delay)
return result
@CommonReadHandler(rdgrng_params)
def read_rdgrng(self):
iscur, exc, rng, autorange, excoff = parse(
self.communicate('RDGRNG?%d' % self.channel))
self._prev_rdgrng = iscur, exc
if autorange: # pressed autorange button
if not self._toggle_autorange:
self._toggle_autorange = True
iexc = 0 if excoff or not iscur else exc
vexc = 0 if excoff or iscur else exc
if (rng, iexc, vexc) != (self.range, self.iexc, self.vexc):
self._last_range_change = time.monotonic()
self.range, self.iexc, self.vexc = rng, iexc, vexc
@CommonWriteHandler(rdgrng_params)
def write_rdgrng(self, change):
self.read_range() # make sure autorange is handled
if 'vexc' in change: # in case vext is changed, do not consider iexc
change['iexc'] = 0
if change['iexc'] != 0: # we need '!= 0' here, as bool(enum) is always True!
iscur = 1
exc = change['iexc']
excoff = 0
elif change['vexc'] != 0: # we need '!= 0' here, as bool(enum) is always True!
iscur = 0
exc = change['vexc']
excoff = 0
else:
iscur, exc = self._prev_rdgrng # set to last read values
excoff = 1
rng = change['range']
if self.autorange:
if rng < self.minrange:
rng = self.minrange
self.communicate('RDGRNG %d,%d,%d,%d,%d,%d;*OPC?' % (
self.channel, iscur, exc, rng, 0, excoff))
self.read_range()
def fix_autorange(self):
if self._toggle_autorange:
if self._toggle_autorange == 'init':
self.write_autorange(True)
else:
self.write_autorange(not self.autorange)
self._toggle_autorange = False
@CommonReadHandler(inset_params)
def read_inset(self):
# ignore curve no and temperature coefficient
enabled, dwell, pause, _, _ = parse(
self.communicate('INSET?%d' % self.channel))
self.enabled = enabled
self.dwell = dwell
self.pause = pause
@CommonWriteHandler(inset_params)
def write_inset(self, change):
_, _, _, curve, tempco = parse(
self.communicate('INSET?%d' % self.channel))
self.enabled, self.dwell, self.pause, _, _ = parse(
self.communicate('INSET %d,%d,%d,%d,%d,%d;INSET?%d' % (
self.channel, change['enabled'], change['dwell'], change['pause'], curve, tempco,
self.channel)))
if 'enabled' in change and change['enabled']:
# switch to enabled channel
self.switcher.write_target(self.channel)
elif self.switcher.target == self.channel:
self.switcher.set_delays(self)
def read_filter(self):
on, settle, _ = parse(self.communicate('FILTER?%d' % self.channel))
return settle if on else 0
def write_filter(self, value):
on = 1 if value else 0
value = max(1, value)
on, settle, _ = parse(self.communicate(
'FILTER %d,%d,%g,80;FILTER?%d' % (self.channel, on, value, self.channel)))
if not on:
settle = 0
return settle
class TemperatureChannel(ResChannel):
raw = Parameter('raw reistance value', FloatRange(unit='Ohm'))
value = Parameter('temperature sensor', FloatRange(0, unit='K'))
STATUS_MASK = 0xff # accept T_OVER and T_UNDER
def read_raw(self):
if self.channel == self.switcher.value == self.switcher.target:
return self._read_value() or self.raw
return self.raw or 0
def read_value(self):
if self.channel == self.switcher.value == self.switcher.target:
return float(self.communicate('RDGK?%d' % self.channel))
return self.value if self.enabled else 0
class TemperatureLoop(HasConvergence, TemperatureChannel, Drivable):
loop = Property('lakshore loop', IntRange(0, 1), default=0) # TODO: implemented specific issues of loop 1
target = Parameter('setpoint', FloatRange(0, unit='$'))
control_active = Parameter('we are controlling', BoolType(), default=0)
minheater = Parameter('minimal heater current', FloatRange(0, 0.01, unit='A'), readonly=False, default=0)
HTRRNG = {n: i for i, n in enumerate(['off', '30uA', '100uA', '300uA', '1mA', '3mA', '10mA', '30mA', '100mA'])}
htrrng = Parameter('', EnumType(HTRRNG), readonly=False)
_control_active = False
def doPoll(self):
super().doPoll()
self.set_htrrng()
@Command
def control_off(self):
"""switch control off"""
self._control_active = False
self.communicate(f'RANGE {self.loop},0;RANGE?{self.loop}')
self.read_control_active()
def min_percent(self):
curr = 10 ** (int(self.htrrng) * 0.5 - 5.0)
result = 100 * min(1.0, self.minheater / curr)
return round(result, 2)
def set_htrrng(self):
if self._control_active:
newhtr = int(self.htrrng) if self._control_active else 0
htrrng = int(self.communicate(f'RANGE?{self.loop}'))
if htrrng != newhtr:
if newhtr:
self.log.info('switched heater on %d', newhtr)
self.communicate(f'RANGE {self.loop},{newhtr};RANGE?{self.loop}')
def read_control_active(self):
self.set_htrrng()
return self._control_active
def read_target(self):
if self._control_active:
return float(self.communicate(f'SETP?{self.loop}'))
return 0
def write_htrrng(self, value):
if self._control_active:
self.communicate('RANGE {self.loop},{int(value)};*OPC?')
return value
def write_target(self, target):
outmode = (5, self.channel, 0, 0, 1, 3)
prev = parse(self.communicate(f'OUTMODE?{self.loop}'))
if outmode != prev:
self.communicate(f'OUTMODE {self.loop},%g,%g,%g,%g,%g,%g;*OPC?' % tuple(outmode))
self.communicate(f'MOUT {self.loop},{self.min_percent()};*OPC?')
for chan in self.switcher.channels.values():
chan._control_active = False
self._control_active = True
self.read_control_active()
self.convergence_start()
# do not return the readback value, as it might not yet be correct
self.communicate(f'SETP {self.loop},{target};*OPC?')
return target
#def write_ctrlpars(self, ctrlpars):
# p, i, d = self.change(f'PID {self.loop}', ctrlpars['p'], ctrlpars['i'], ctrlpars['d'])
# return {'p': p, 'i': i, 'd': d}
#def read_ctrlpars(self):
# p, i, d = self.query(f'PID? {self.loop}')
# return {'p': p, 'i': i, 'd': d}