diff --git a/frappy_psi/channelswitcher.py b/frappy_psi/channelswitcher.py index 8a90e8e..f266b31 100644 --- a/frappy_psi/channelswitcher.py +++ b/frappy_psi/channelswitcher.py @@ -149,7 +149,7 @@ class ChannelSwitcher(Drivable): def write_target(self, channel): if channel not in self.channels: raise ValueError(f'{channel!r} is no valid channel') - if channel == self.target and self.channels[channel].enabled: + if channel == self.value and self.channels[channel].enabled: return channel chan = self.channels[channel] chan.enabled = True diff --git a/frappy_psi/ls372.py b/frappy_psi/ls372.py index 633dcaf..06a6069 100644 --- a/frappy_psi/ls372.py +++ b/frappy_psi/ls372.py @@ -32,8 +32,8 @@ import time import frappy.io from frappy.datatypes import BoolType, EnumType, FloatRange, IntRange from frappy.lib import formatStatusBits -from frappy.core import Done, Drivable, Parameter, Property, CommonReadHandler, CommonWriteHandler -from frappy.io import HasIO +from frappy.core import Command, Drivable, Parameter, Property, CommonReadHandler, CommonWriteHandler +from frappy_psi.convergence import HasConvergence from frappy_psi.channelswitcher import Channel, ChannelSwitcher Status = Drivable.Status @@ -54,7 +54,7 @@ def parse1(string): def parse(reply): - return [parse1(s) for s in reply.split(',')] + return tuple(parse1(s) for s in reply.split(',')) class StringIO(frappy.io.StringIO): @@ -62,7 +62,7 @@ class StringIO(frappy.io.StringIO): wait_before = 0.05 -class Switcher(HasIO, ChannelSwitcher): +class Switcher(frappy.io.HasIO, ChannelSwitcher): value = Parameter(datatype=IntRange(1, 16)) target = Parameter(datatype=IntRange(1, 16)) use_common_delays = Parameter('use switch_delay and measure_delay instead of the channels pause and dwell', @@ -73,14 +73,25 @@ class Switcher(HasIO, ChannelSwitcher): _measure_delay = None _switch_delay = None - def startModule(self, start_events): - super().startModule(start_events) + def initialReads(self): # disable unused channels + display = [] for ch in range(1, 16): - if ch not in self._channels: + chan = self.channels.get(ch) + if chan: + if hasattr(chan, 'raw'): + display.append((ch, 2)) + display.append((ch, 1)) + else: self.communicate('INSET %d,0,0,0,0,0;INSET?%d' % (ch, ch)) + if len(display) > 4: + self.communicate('DISPLAY 2,2,1;*OPC?') + else: + self.communicate('DISPLAY 2,1,1;*OPC?') + for i, (ch, unit) in enumerate(display): + self.communicate(f'DISPFLD {i+1},{ch},{unit};*OPC?') channelno, autoscan = parse(self.communicate('SCAN?')) - if channelno in self._channels and self._channels[channelno].enabled: + if channelno in self.channels and self.channels[channelno].enabled: if not autoscan: return # nothing to do else: @@ -88,7 +99,7 @@ class Switcher(HasIO, ChannelSwitcher): if channelno is None: self.status = 'ERROR', 'no enabled channel' return - self.communicate('SCAN %d,0;SCAN?' % channelno) + self.set_active_channel(self.channels[channelno]) def doPoll(self): """poll buttons @@ -96,7 +107,7 @@ class Switcher(HasIO, ChannelSwitcher): and check autorange during filter time """ super().doPoll() - self._channels[self.target]._read_value() # check range or read + self.channels[self.target]._read_value() # check range or read channelno, autoscan = parse(self.communicate('SCAN?')) if autoscan: # pressed autoscan button: switch off HW autoscan and toggle soft autoscan @@ -104,17 +115,18 @@ class Switcher(HasIO, ChannelSwitcher): self.communicate('SCAN %d,0;SCAN?' % self.value) if channelno != self.value: # channel changed by keyboard, do not yet return new channel + self.log.info('channel changed by keyboard %d!', channelno) self.write_target(channelno) - chan = self._channels.get(channelno) + chan = self.channels.get(channelno) if chan is None: + self.log.info('invalid channel %d!', channelno) channelno = self.next_channel(channelno) if channelno is None: raise ValueError('no channels enabled') self.write_target(channelno) - chan = self._channels.get(self.value) + chan = self.channels.get(self.value) chan.read_autorange() chan.fix_autorange() # check for toggled autorange button - return Done def write_switch_delay(self, value): self._switch_delay = value @@ -149,7 +161,8 @@ class Switcher(HasIO, ChannelSwitcher): self.measure_delay = chan.dwell def set_active_channel(self, chan): - self.communicate('SCAN %d,0;SCAN?' % chan.channel) + channelno = parse(self.communicate('SCAN %d,0;SCAN?' % chan.channel))[0] + self.value = channelno chan._last_range_change = time.monotonic() self.set_delays(chan) @@ -167,7 +180,7 @@ class ResChannel(Channel): enumerate(mag % val for mag in ['%guV', '%gmV'] for val in [2, 6.32, 20, 63.2, 200, 632]))} RES_SCALE = [2 * 10 ** (0.5 * i) for i in range(-7, 16)] # RES_SCALE[0] is not used - MAX_RNG = len(RES_SCALE) - 1 + MAX_RNG = len(RES_SCALE) - 2 # was - 1 channel = Property('the Lakeshore channel', datatype=IntRange(1, 16), export=False) @@ -191,6 +204,7 @@ class ResChannel(Channel): _last_range_change = 0 rdgrng_params = 'range', 'iexc', 'vexc' inset_params = 'enabled', 'pause', 'dwell' + STATUS_MASK = 0x3f # mask T_OVER and T_UNDER def communicate(self, command): return self.switcher.communicate(command) @@ -199,9 +213,8 @@ class ResChannel(Channel): if not self.enabled: return [self.Status.DISABLED, 'disabled'] if not self.channel == self.switcher.value == self.switcher.target: - return Done - result = int(self.communicate('RDGST?%d' % self.channel)) - result &= 0x37 # mask T_OVER and T_UNDER (change this when implementing temperatures instead of resistivities) + return self.status + result = int(self.communicate('RDGST?%d' % self.channel)) & self.STATUS_MASK statustext = ' '.join(formatStatusBits(result, STATUS_BIT_LABELS)) if statustext: return [self.Status.ERROR, statustext] @@ -212,13 +225,14 @@ class ResChannel(Channel): now = time.monotonic() if now + 0.5 < max(self._last_range_change, self.switcher._start_switch) + self.pause: return None - result = self.communicate('RDGR?%d' % self.channel) - result = float(result) + result = float(self.communicate('RDGR?%d' % self.channel)) + if result == 0: + return None if self.autorange: self.fix_autorange() if now + 0.5 > self._last_range_change + self.pause: rng = int(max(self.minrange, self.range)) # convert from enum to int - if self.status[1] == '': + if self.status[0] < self.Status.ERROR: if abs(result) > self.RES_SCALE[rng]: if rng < 22: rng += 1 @@ -237,8 +251,8 @@ class ResChannel(Channel): def read_value(self): if self.channel == self.switcher.value == self.switcher.target: - return self._read_value() - return Done # return previous value + return self._read_value() or self.value + return self.value if self.enabled else 0 def is_switching(self, now, last_switch, switch_delay): last_switch = max(last_switch, self._last_range_change) @@ -327,3 +341,88 @@ class ResChannel(Channel): if not on: settle = 0 return settle + + + +class TemperatureChannel(ResChannel): + raw = Parameter('raw reistance value', FloatRange(unit='Ohm')) + value = Parameter('temperature sensor', FloatRange(0, unit='K')) + STATUS_MASK = 0xff # accept T_OVER and T_UNDER + + def read_raw(self): + if self.channel == self.switcher.value == self.switcher.target: + return self._read_value() or self.raw + return self.raw or 0 + + def read_value(self): + if self.channel == self.switcher.value == self.switcher.target: + return float(self.communicate('RDGK?%d' % self.channel)) + return self.value if self.enabled else 0 + + +class TemperatureLoop(HasConvergence, TemperatureChannel, Drivable): + target = Parameter('setpoint', FloatRange(0, unit='$')) + control_active = Parameter('we are controlling', BoolType(), default=0) + minheater = Parameter('minimal heater current', FloatRange(0, 0.01, unit='A'), readonly=False, default=0) + HTRRNG = {n: i for i, n in enumerate(['off', '30uA', '100uA', '300uA', '1mA', '3mA', '10mA', '30mA', '100mA'])} + htrrng = Parameter('', EnumType(HTRRNG), readonly=False) + _control_active = False + _underflow = False + + @Command + def control_off(self): + """switch control off""" + self._control_active = False + self.communicate(f'RANGE 0,0;RANGE?0') + self.read_control_active() + + def min_percent(self): + curr = 10 ** (int(self.htrrng) * 0.5 - 5.0) + result = 100 * min(1.0, self.minheater / curr) + return round(result, 2) + + def set_htrrng(self): + if self._control_active: + newhtr = int(self.htrrng) if self._control_active else 0 + htrrng = int(self.communicate('RANGE?0')) + if htrrng != newhtr: + if newhtr: + self.log.info('switched heater on %d', newhtr) + self.communicate(f'RANGE 0,{newhtr};RANGE?0') + if self.minheater: + self.log.info('underflow open loop') + self._underflow = True + self.communicate(f'OUTMODE 0,2,{self.channel},0,0,1,3;*OPC?') + self.communicate(f'MOUT {self.min_percent()};*OPC?') + elif self._underflow: + self.log.info('switch to control after underflow') + self._underflow = False + self.communicate(f'OUTMODE 0,5,{self.channel},0,0,1,3;*OPC?') + self.communicate(f'SETP 0,{self.target};SETP?0') + + def read_control_active(self): + self.set_htrrng() + return self._control_active + + def read_target(self): + if self._control_active: + return float(self.communicate('SETP?0')) + return 0 + + def write_htrrng(self, value): + if self._control_active: + self.communicate('RANGE 0,{int(value)};*OPC?') + return value + + def write_target(self, target): + outmode = (5, self.channel, 0, 0, 1, 3) + prev = parse(self.communicate('OUTMODE?0')) + if outmode != prev: + self.communicate('OUTMODE 0, %g,%g,%g,%g,%g,%g;*OPC?' % tuple(outmode)) + self.communicate('MOUT 0,{self.min_percent()};*OPC?') + for chan in self.switcher.channels.values(): + chan._control_active = False + self._control_active = True + self.read_control_active() + self.convergence_start() + return float(self.communicate(f'SETP 0,{target};SETP?0'))