frappy_psi.ls372: add TemperatureSensor and TemperatureLoop

This commit is contained in:
l_samenv 2023-06-29 15:06:45 +02:00 committed by Markus Zolliker
parent ef5f4cd2f3
commit be7c9eec8c
2 changed files with 123 additions and 24 deletions

View File

@ -149,7 +149,7 @@ class ChannelSwitcher(Drivable):
def write_target(self, channel):
if channel not in self.channels:
raise ValueError(f'{channel!r} is no valid channel')
if channel == self.target and self.channels[channel].enabled:
if channel == self.value and self.channels[channel].enabled:
return channel
chan = self.channels[channel]
chan.enabled = True

View File

@ -32,8 +32,8 @@ import time
import frappy.io
from frappy.datatypes import BoolType, EnumType, FloatRange, IntRange
from frappy.lib import formatStatusBits
from frappy.core import Done, Drivable, Parameter, Property, CommonReadHandler, CommonWriteHandler
from frappy.io import HasIO
from frappy.core import Command, Drivable, Parameter, Property, CommonReadHandler, CommonWriteHandler
from frappy_psi.convergence import HasConvergence
from frappy_psi.channelswitcher import Channel, ChannelSwitcher
Status = Drivable.Status
@ -54,7 +54,7 @@ def parse1(string):
def parse(reply):
return [parse1(s) for s in reply.split(',')]
return tuple(parse1(s) for s in reply.split(','))
class StringIO(frappy.io.StringIO):
@ -62,7 +62,7 @@ class StringIO(frappy.io.StringIO):
wait_before = 0.05
class Switcher(HasIO, ChannelSwitcher):
class Switcher(frappy.io.HasIO, ChannelSwitcher):
value = Parameter(datatype=IntRange(1, 16))
target = Parameter(datatype=IntRange(1, 16))
use_common_delays = Parameter('use switch_delay and measure_delay instead of the channels pause and dwell',
@ -73,14 +73,25 @@ class Switcher(HasIO, ChannelSwitcher):
_measure_delay = None
_switch_delay = None
def startModule(self, start_events):
super().startModule(start_events)
def initialReads(self):
# disable unused channels
display = []
for ch in range(1, 16):
if ch not in self._channels:
chan = self.channels.get(ch)
if chan:
if hasattr(chan, 'raw'):
display.append((ch, 2))
display.append((ch, 1))
else:
self.communicate('INSET %d,0,0,0,0,0;INSET?%d' % (ch, ch))
if len(display) > 4:
self.communicate('DISPLAY 2,2,1;*OPC?')
else:
self.communicate('DISPLAY 2,1,1;*OPC?')
for i, (ch, unit) in enumerate(display):
self.communicate(f'DISPFLD {i+1},{ch},{unit};*OPC?')
channelno, autoscan = parse(self.communicate('SCAN?'))
if channelno in self._channels and self._channels[channelno].enabled:
if channelno in self.channels and self.channels[channelno].enabled:
if not autoscan:
return # nothing to do
else:
@ -88,7 +99,7 @@ class Switcher(HasIO, ChannelSwitcher):
if channelno is None:
self.status = 'ERROR', 'no enabled channel'
return
self.communicate('SCAN %d,0;SCAN?' % channelno)
self.set_active_channel(self.channels[channelno])
def doPoll(self):
"""poll buttons
@ -96,7 +107,7 @@ class Switcher(HasIO, ChannelSwitcher):
and check autorange during filter time
"""
super().doPoll()
self._channels[self.target]._read_value() # check range or read
self.channels[self.target]._read_value() # check range or read
channelno, autoscan = parse(self.communicate('SCAN?'))
if autoscan:
# pressed autoscan button: switch off HW autoscan and toggle soft autoscan
@ -104,17 +115,18 @@ class Switcher(HasIO, ChannelSwitcher):
self.communicate('SCAN %d,0;SCAN?' % self.value)
if channelno != self.value:
# channel changed by keyboard, do not yet return new channel
self.log.info('channel changed by keyboard %d!', channelno)
self.write_target(channelno)
chan = self._channels.get(channelno)
chan = self.channels.get(channelno)
if chan is None:
self.log.info('invalid channel %d!', channelno)
channelno = self.next_channel(channelno)
if channelno is None:
raise ValueError('no channels enabled')
self.write_target(channelno)
chan = self._channels.get(self.value)
chan = self.channels.get(self.value)
chan.read_autorange()
chan.fix_autorange() # check for toggled autorange button
return Done
def write_switch_delay(self, value):
self._switch_delay = value
@ -149,7 +161,8 @@ class Switcher(HasIO, ChannelSwitcher):
self.measure_delay = chan.dwell
def set_active_channel(self, chan):
self.communicate('SCAN %d,0;SCAN?' % chan.channel)
channelno = parse(self.communicate('SCAN %d,0;SCAN?' % chan.channel))[0]
self.value = channelno
chan._last_range_change = time.monotonic()
self.set_delays(chan)
@ -167,7 +180,7 @@ class ResChannel(Channel):
enumerate(mag % val for mag in ['%guV', '%gmV']
for val in [2, 6.32, 20, 63.2, 200, 632]))}
RES_SCALE = [2 * 10 ** (0.5 * i) for i in range(-7, 16)] # RES_SCALE[0] is not used
MAX_RNG = len(RES_SCALE) - 1
MAX_RNG = len(RES_SCALE) - 2 # was - 1
channel = Property('the Lakeshore channel', datatype=IntRange(1, 16), export=False)
@ -191,6 +204,7 @@ class ResChannel(Channel):
_last_range_change = 0
rdgrng_params = 'range', 'iexc', 'vexc'
inset_params = 'enabled', 'pause', 'dwell'
STATUS_MASK = 0x3f # mask T_OVER and T_UNDER
def communicate(self, command):
return self.switcher.communicate(command)
@ -199,9 +213,8 @@ class ResChannel(Channel):
if not self.enabled:
return [self.Status.DISABLED, 'disabled']
if not self.channel == self.switcher.value == self.switcher.target:
return Done
result = int(self.communicate('RDGST?%d' % self.channel))
result &= 0x37 # mask T_OVER and T_UNDER (change this when implementing temperatures instead of resistivities)
return self.status
result = int(self.communicate('RDGST?%d' % self.channel)) & self.STATUS_MASK
statustext = ' '.join(formatStatusBits(result, STATUS_BIT_LABELS))
if statustext:
return [self.Status.ERROR, statustext]
@ -212,13 +225,14 @@ class ResChannel(Channel):
now = time.monotonic()
if now + 0.5 < max(self._last_range_change, self.switcher._start_switch) + self.pause:
return None
result = self.communicate('RDGR?%d' % self.channel)
result = float(result)
result = float(self.communicate('RDGR?%d' % self.channel))
if result == 0:
return None
if self.autorange:
self.fix_autorange()
if now + 0.5 > self._last_range_change + self.pause:
rng = int(max(self.minrange, self.range)) # convert from enum to int
if self.status[1] == '':
if self.status[0] < self.Status.ERROR:
if abs(result) > self.RES_SCALE[rng]:
if rng < 22:
rng += 1
@ -237,8 +251,8 @@ class ResChannel(Channel):
def read_value(self):
if self.channel == self.switcher.value == self.switcher.target:
return self._read_value()
return Done # return previous value
return self._read_value() or self.value
return self.value if self.enabled else 0
def is_switching(self, now, last_switch, switch_delay):
last_switch = max(last_switch, self._last_range_change)
@ -327,3 +341,88 @@ class ResChannel(Channel):
if not on:
settle = 0
return settle
class TemperatureChannel(ResChannel):
raw = Parameter('raw reistance value', FloatRange(unit='Ohm'))
value = Parameter('temperature sensor', FloatRange(0, unit='K'))
STATUS_MASK = 0xff # accept T_OVER and T_UNDER
def read_raw(self):
if self.channel == self.switcher.value == self.switcher.target:
return self._read_value() or self.raw
return self.raw or 0
def read_value(self):
if self.channel == self.switcher.value == self.switcher.target:
return float(self.communicate('RDGK?%d' % self.channel))
return self.value if self.enabled else 0
class TemperatureLoop(HasConvergence, TemperatureChannel, Drivable):
target = Parameter('setpoint', FloatRange(0, unit='$'))
control_active = Parameter('we are controlling', BoolType(), default=0)
minheater = Parameter('minimal heater current', FloatRange(0, 0.01, unit='A'), readonly=False, default=0)
HTRRNG = {n: i for i, n in enumerate(['off', '30uA', '100uA', '300uA', '1mA', '3mA', '10mA', '30mA', '100mA'])}
htrrng = Parameter('', EnumType(HTRRNG), readonly=False)
_control_active = False
_underflow = False
@Command
def control_off(self):
"""switch control off"""
self._control_active = False
self.communicate(f'RANGE 0,0;RANGE?0')
self.read_control_active()
def min_percent(self):
curr = 10 ** (int(self.htrrng) * 0.5 - 5.0)
result = 100 * min(1.0, self.minheater / curr)
return round(result, 2)
def set_htrrng(self):
if self._control_active:
newhtr = int(self.htrrng) if self._control_active else 0
htrrng = int(self.communicate('RANGE?0'))
if htrrng != newhtr:
if newhtr:
self.log.info('switched heater on %d', newhtr)
self.communicate(f'RANGE 0,{newhtr};RANGE?0')
if self.minheater:
self.log.info('underflow open loop')
self._underflow = True
self.communicate(f'OUTMODE 0,2,{self.channel},0,0,1,3;*OPC?')
self.communicate(f'MOUT {self.min_percent()};*OPC?')
elif self._underflow:
self.log.info('switch to control after underflow')
self._underflow = False
self.communicate(f'OUTMODE 0,5,{self.channel},0,0,1,3;*OPC?')
self.communicate(f'SETP 0,{self.target};SETP?0')
def read_control_active(self):
self.set_htrrng()
return self._control_active
def read_target(self):
if self._control_active:
return float(self.communicate('SETP?0'))
return 0
def write_htrrng(self, value):
if self._control_active:
self.communicate('RANGE 0,{int(value)};*OPC?')
return value
def write_target(self, target):
outmode = (5, self.channel, 0, 0, 1, 3)
prev = parse(self.communicate('OUTMODE?0'))
if outmode != prev:
self.communicate('OUTMODE 0, %g,%g,%g,%g,%g,%g;*OPC?' % tuple(outmode))
self.communicate('MOUT 0,{self.min_percent()};*OPC?')
for chan in self.switcher.channels.values():
chan._control_active = False
self._control_active = True
self.read_control_active()
self.convergence_start()
return float(self.communicate(f'SETP 0,{target};SETP?0'))