channel switcher for Lakeshore 370 with scanner

- add a general channel switcher module
- change ls370res code from IOHandler to rwhandlers
+ fix an issue with the poller when io module is placed below
  using modules in cfg file

after this, IOHandler stuff may be removed from Frappy

Change-Id: I787101fc1e365ae3e0453bfe59291e2011a1fe53
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/28512
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2022-05-24 16:34:10 +02:00
parent 8eee7ab3b0
commit 4c94580cb9
6 changed files with 418 additions and 202 deletions

View File

@ -38,7 +38,7 @@ doc:
$(MAKE) -C doc html $(MAKE) -C doc html
lint: lint:
pylint -j 0 -f colorized -r n --rcfile=.pylintrc secop secop_* test pylint -f colorized -r n --rcfile=.pylintrc secop secop_* test
isort: isort:
@find test -name '*.py' -print0 | xargs -0 isort -e -m 2 -w 80 -ns __init__.py @find test -name '*.py' -print0 | xargs -0 isort -e -m 2 -w 80 -ns __init__.py

View File

@ -5,19 +5,24 @@ description = Lsc Simulation at PSI
[INTERFACE] [INTERFACE]
uri = tcp://5000 uri = tcp://5000
[res]
class = secop_psi.ls370res.ResChannel
channel = 3
description = resistivity
main = lsmain
io = lscom
[lsmain]
class = secop_psi.ls370res.Main
description = main control of Lsc controller
io = lscom
[lscom] [lscom]
class = secop_psi.ls370sim.Ls370Sim class = secop_psi.ls370sim.Ls370Sim
description = simulated serial communicator to a LS 370 description = simulated serial communicator to a LS 370
visibility = 3 visibility = 3
[sw]
class = secop_psi.ls370res.Switcher
description = channel switcher for Lsc controller
io = lscom
[a]
class = secop_psi.ls370res.ResChannel
channel = 1
description = resistivity
switcher = sw
[b]
class = secop_psi.ls370res.ResChannel
channel = 3
description = resistivity
switcher = sw

View File

@ -597,7 +597,7 @@ class Module(HasAccessibles):
self.io.polledModules.append(self) self.io.polledModules.append(self)
else: else:
self.triggerPoll = threading.Event() self.triggerPoll = threading.Event()
self.polledModules = [self] self.polledModules.append(self)
def startModule(self, start_events): def startModule(self, start_events):
"""runs after init of all modules """runs after init of all modules

View File

@ -0,0 +1,179 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""channel switcher Mixin
Example Config File:
[sw]
description=the switcher for blabla channels
class=secop_facility.module.YourChannelSwitcher
uri=...
[chan1]
description=channel 1
class=secop_facility.module.YourChannel
switcher=sw
[chan2]
...
"""
import time
from secop.datatypes import IntRange, BoolType, FloatRange
from secop.core import Attached, Property, Drivable, Parameter, Readable, Done
class ChannelSwitcher(Drivable):
"""base class for the channel switcher
minimum implementation:
- override :meth:`set_active_channel`
- <channel>.read_value() and <channel>.read_status() is called periodically
every <channel>.pollinterval on the active channel only
- <channel>.is_switching(...) is called every <switcher>.pollinterval
during switching period, until it is returning False
"""
value = Parameter('the current channel number', IntRange(), needscfg=False)
target = Parameter('channel to select', IntRange(), needscfg=False)
autoscan = Parameter('whether to scan automatically',
BoolType(), readonly=False, default=True)
pollinterval = Parameter(default=1, export=False)
switch_delay = Parameter('the time needed to switch between channels',
FloatRange(0, None), readonly=False, default=5)
measure_delay = Parameter('the time for staying at a channel',
FloatRange(0, None), readonly=False, default=2)
fast_poll = 0.1
_channels = None # dict <channel no> of <module object>
_start_measure = 0
_last_measure = 0
_start_switch = 0
_time_tol = 0.5
def earlyInit(self):
super().earlyInit()
self._channels = {}
def register_channel(self, mod):
"""register module"""
self._channels[mod.channel] = mod
def set_active_channel(self, chan):
"""tell the HW the active channel
:param chan: a channel object
to be implemented
"""
raise NotImplementedError
def next_channel(self, channelno):
next_channel = channelno
first_channel = None
for ch, mod in self._channels.items():
if mod.enabled:
if first_channel is None:
first_channel = ch
if next_channel == ch:
next_channel = None
elif next_channel is None:
next_channel = ch
break
else:
next_channel = first_channel
return next_channel
def read_status(self):
now = time.monotonic()
if self.status[0] == 'BUSY':
chan = self._channels[self.target]
if chan.is_switching(now, self._start_switch, self.switch_delay):
return Done
self.setFastPoll(False)
self.status = 'IDLE', 'measure'
self.value = self.target
self._start_measure = self._last_measure = now
chan.read_value()
chan.read_status()
if self.measure_delay > self._time_tol:
return Done
else:
chan = self._channels[self.value]
self.read_value() # this might modify autoscan or deadline!
if chan.enabled:
if self.target != self.value: # may happen after startup
self.target = self.value
next_measure = self._last_measure + chan.pollinterval
if now + self._time_tol > next_measure:
chan.read_value()
chan.read_status()
self._last_measure = next_measure
if not self.autoscan or now + self._time_tol < self._start_measure + self.measure_delay:
return Done
next_channel = self.next_channel(self.value)
if next_channel == self.value:
return 'IDLE', 'single channel'
if next_channel is None:
return 'ERROR', 'no enabled channel'
self.write_target(next_channel)
return self.status
def write_pollinterval(self, value):
self._time_tol = min(1, value) * 0.5
return value
def write_target(self, channel):
if channel not in self._channels:
raise ValueError('%r is no valid channel' % channel)
if channel == self.target and self._channels[channel].enabled:
return channel
chan = self._channels[channel]
chan.enabled = True
self.set_active_channel(chan)
self._start_switch = time.monotonic()
self.status = 'BUSY', 'change channel'
self.setFastPoll(True, self.fast_poll)
return channel
class Channel(Readable):
"""base class for channels
you should override the datatype of the channel property,
in order to match the datatype of the switchers value
"""
switcher = Attached()
channel = Property('channel number', IntRange())
enabled = Parameter('enabled flag', BoolType(), default=True)
value = Parameter(needscfg=False)
def initModule(self):
super().initModule()
self.switcher.register_channel(self)
def doPoll(self):
"""value and status are polled by switcher"""
def is_switching(self, now, last_switch, switch_delay):
"""returns True when switching is done"""
return now + self.switcher._time_tol < last_switch + switch_delay

View File

@ -18,37 +18,28 @@
# Module authors: # Module authors:
# Markus Zolliker <markus.zolliker@psi.ch> # Markus Zolliker <markus.zolliker@psi.ch>
# ***************************************************************************** # *****************************************************************************
"""LakeShore Model 370 resistance channel""" """LakeShore Model 370 resistance channel
implements autoscan and autorange by software.
when the autoscan or autorange button is pressed, the state is toggled,
and the hardware mode switched off again.
At startup, the configurable default mode is set, independent of
the hardware state.
"""
import time import time
from ast import literal_eval
import secop.iohandler import secop.io
from secop.datatypes import BoolType, EnumType, FloatRange, IntRange from secop.datatypes import BoolType, EnumType, FloatRange, IntRange
from secop.lib import formatStatusBits from secop.lib import formatStatusBits
from secop.modules import Attached, Done, \ from secop.core import Done, Drivable, Parameter, Property, CommonReadHandler, CommonWriteHandler
Drivable, Parameter, Property, Readable
from secop.io import HasIO from secop.io import HasIO
from secop_psi.channelswitcher import Channel, ChannelSwitcher
Status = Drivable.Status Status = Drivable.Status
class IOHandler(secop.iohandler.IOHandler):
CMDARGS = ['channel']
CMDSEPARATOR = ';'
def __init__(self, name, querycmd, replyfmt):
changecmd = querycmd.replace('?', ' ')
if not querycmd.endswith('?'):
changecmd += ','
super().__init__(name, querycmd, replyfmt, changecmd)
rdgrng = IOHandler('rdgrng', 'RDGRNG?%(channel)d', '%d,%d,%d,%d,%d')
inset = IOHandler('inset', 'INSET?%(channel)d', '%d,%d,%d,%d,%d')
filterhdl = IOHandler('filter', 'FILTER?%(channel)d', '%d,%d,%d')
scan = IOHandler('scan', 'SCAN?', '%d,%d')
STATUS_BIT_LABELS = 'CS_OVL VCM_OVL VMIX_OVL VDIF_OVL R_OVER R_UNDER T_OVER T_UNDER'.split() STATUS_BIT_LABELS = 'CS_OVL VCM_OVL VMIX_OVL VDIF_OVL R_OVER R_UNDER T_OVER T_UNDER'.split()
@ -57,163 +48,143 @@ class StringIO(secop.io.StringIO):
wait_before = 0.05 wait_before = 0.05
class Main(HasIO, Drivable): class Switcher(HasIO, ChannelSwitcher):
value = Parameter(datatype=IntRange(1, 16))
value = Parameter('the current channel', datatype=IntRange(0, 17)) target = Parameter(datatype=IntRange(1, 16))
target = Parameter('channel to select', datatype=IntRange(0, 17)) use_common_delays = Parameter('use switch_delay and measure_delay instead of the channels pause and dwell',
autoscan = Parameter('whether to scan automatically', datatype=BoolType(), readonly=False, default=False) BoolType(), readonly=False, default=False)
pollinterval = Parameter(default=1, export=False) common_pause = Parameter('pause with common delays', FloatRange(3, 200, unit='s'), readonly=False, default=3)
ioClass = StringIO ioClass = StringIO
_channel_changed = 0 # time of last channel change fast_poll = 1
_channels = None # dict <channel no> of <module object> _measure_delay = None
_switch_delay = None
def earlyInit(self):
super().earlyInit()
self._channels = {}
def register_channel(self, modobj):
self._channels[modobj.channel] = modobj
def startModule(self, start_events): def startModule(self, start_events):
super().startModule(start_events) super().startModule(start_events)
# disable unused channels
for ch in range(1, 16): for ch in range(1, 16):
if ch not in self._channels: if ch not in self._channels:
self.communicate('INSET %d,0,0,0,0,0;INSET?%d' % (ch, ch)) self.communicate('INSET %d,0,0,0,0,0;INSET?%d' % (ch, ch))
channelno, autoscan = literal_eval(self.communicate('SCAN?'))
if channelno in self._channels and self._channels[channelno].enabled:
if not autoscan:
return # nothing to do
else:
channelno = self.next_channel(channelno)
if channelno is None:
self.status = 'ERROR', 'no enabled channel'
return
self.communicate('SCAN %d,0;SCAN?' % channelno)
def read_value(self): def doPoll(self):
channel, auto = scan.send_command(self) """poll buttons
if channel not in self._channels:
return channel
if not self._channels[channel].enabled:
# channel was disabled recently, but still selected
nextchannel = 0
for ch, mobj in self._channels.items():
if mobj.enabled:
if ch > channel:
nextchannel = ch
break
if nextchannel == 0:
nextchannel = ch
if nextchannel:
self.write_target(nextchannel)
return 0
now = time.time() and check autorange during filter time
if channel != self.target: """
self._channel_changed = now super().doPoll()
self.target = channel self._channels[self.target]._read_value() # check range or read
self.autoscan = int(auto) channelno, autoscan = literal_eval(self.communicate('SCAN?'))
if now < self._channel_changed + self._channels[channel].pause + self._channels[channel].filter: if autoscan:
self.status = [Status.BUSY, 'switching'] # pressed autoscan button: switch off HW autoscan and toggle soft autoscan
return 0 self.autoscan = not self.autoscan
self.status = [Status.IDLE, ''] self.communicate('SCAN %d,0;SCAN?' % self.value)
return channel if channelno != self.value:
# channel changed by keyboard, do not yet return new channel
self.write_target(channelno)
chan = self._channels.get(channelno)
if chan is None:
channelno = self.next_channel(channelno)
if channelno is None:
raise ValueError('no channels enabled')
self.write_target(channelno)
chan = self._channels.get(self.value)
chan.read_autorange()
chan.fix_autorange() # check for toggled autorange button
return Done
def write_target(self, channel): def write_switch_delay(self, value):
scan.send_change(self, channel, self.autoscan) self._switch_delay = value
# self.communicate('SCAN %d,%d;SCAN?' % (channel, self.autoscan)) return super().write_switch_delay(value)
if channel != self.value:
self.value = 0
self._channel_changed = time.time()
self.status = [Status.BUSY, 'switching']
return channel
def write_autoscan(self, value): def write_measure_delay(self, value):
scan.send_change(self, self.value, value) self._measure_delay = value
# self.communicate('SCAN %d,%d;SCAN?' % (channel, self.autoscan)) return super().write_measure_delay(value)
def write_use_common_delays(self, value):
if value:
# use values from a previous change, instead of
# the values from the current channel
if self._measure_delay is not None:
self.measure_delay = self._measure_delay
if self._switch_delay is not None:
self.switch_delay = self._switch_delay
return value return value
def set_delays(self, chan):
if self.use_common_delays:
if chan.dwell != self.measure_delay:
chan.write_dwell(self.measure_delay)
if chan.pause != self.common_pause:
chan.write_pause(self.common_pause)
filter_ = max(0, self.switch_delay - self.common_pause)
if chan.filter != filter_:
chan.write_filter(filter_)
else:
# switch_delay and measure_delay is changing with channel
self.switch_delay = chan.pause + chan.filter
self.measure_delay = chan.dwell
class ResChannel(HasIO, Readable): def set_active_channel(self, chan):
"""temperature channel on Lakeshore 336""" self.communicate('SCAN %d,0;SCAN?' % chan.channel)
chan._last_range_change = time.monotonic()
self.set_delays(chan)
class ResChannel(Channel):
"""temperature channel on Lakeshore 370"""
RES_RANGE = {key: i+1 for i, key in list( RES_RANGE = {key: i+1 for i, key in list(
enumerate(mag % val for mag in ['%gmOhm', '%gOhm', '%gkOhm', '%gMOhm'] enumerate(mag % val for mag in ['%gmOhm', '%gOhm', '%gkOhm', '%gMOhm']
for val in [2, 6.32, 20, 63.2, 200, 632]))[:-2]} for val in [2, 6.32, 20, 63.2, 200, 632]))[:-2]}
RES_SCALE = [2 * 10 ** (0.5 * i) for i in range(-7, 16)] # RES_SCALE[0] is not used
CUR_RANGE = {key: i + 1 for i, key in list( CUR_RANGE = {key: i + 1 for i, key in list(
enumerate(mag % val for mag in ['%gpA', '%gnA', '%guA', '%gmA'] enumerate(mag % val for mag in ['%gpA', '%gnA', '%guA', '%gmA']
for val in [1, 3.16, 10, 31.6, 100, 316]))[:-2]} for val in [1, 3.16, 10, 31.6, 100, 316]))[:-2]}
VOLT_RANGE = {key: i + 1 for i, key in list( VOLT_RANGE = {key: i + 1 for i, key in list(
enumerate(mag % val for mag in ['%guV', '%gmV'] enumerate(mag % val for mag in ['%guV', '%gmV']
for val in [2, 6.32, 20, 63.2, 200, 632]))} for val in [2, 6.32, 20, 63.2, 200, 632]))}
RES_SCALE = [2 * 10 ** (0.5 * i) for i in range(-7, 16)] # RES_SCALE[0] is not used
ioClass = StringIO MAX_RNG = len(RES_SCALE) - 1
_main = None # main module
_last_range_change = 0 # time of last range change
channel = Property('the Lakeshore channel', datatype=IntRange(1, 16), export=False) channel = Property('the Lakeshore channel', datatype=IntRange(1, 16), export=False)
main = Attached()
value = Parameter(datatype=FloatRange(unit='Ohm')) value = Parameter(datatype=FloatRange(unit='Ohm'))
pollinterval = Parameter(visibility=3, default=1, export=False) pollinterval = Parameter(visibility=3, default=1)
range = Parameter('reading range', readonly=False, range = Parameter('reading range', readonly=False,
datatype=EnumType(**RES_RANGE), handler=rdgrng) datatype=EnumType(**RES_RANGE))
minrange = Parameter('minimum range for software autorange', readonly=False, default=1, minrange = Parameter('minimum range for software autorange', readonly=False, default=1,
datatype=EnumType(**RES_RANGE)) datatype=EnumType(**RES_RANGE))
autorange = Parameter('autorange', datatype=EnumType(off=0, hard=1, soft=2), autorange = Parameter('autorange', datatype=BoolType(),
readonly=False, handler=rdgrng, default=2) readonly=False, default=1)
iexc = Parameter('current excitation', datatype=EnumType(off=0, **CUR_RANGE), readonly=False, handler=rdgrng) iexc = Parameter('current excitation', datatype=EnumType(off=0, **CUR_RANGE), readonly=False)
vexc = Parameter('voltage excitation', datatype=EnumType(off=0, **VOLT_RANGE), readonly=False, handler=rdgrng) vexc = Parameter('voltage excitation', datatype=EnumType(off=0, **VOLT_RANGE), readonly=False)
enabled = Parameter('is this channel enabled?', datatype=BoolType(), readonly=False, handler=inset) enabled = Parameter('is this channel enabled?', datatype=BoolType(), readonly=False)
pause = Parameter('pause after channel change', datatype=FloatRange(3, 60), readonly=False, handler=inset) pause = Parameter('pause after channel change', datatype=FloatRange(3, 60, unit='s'), readonly=False)
dwell = Parameter('dwell time with autoscan', datatype=FloatRange(1, 200), readonly=False, handler=inset) dwell = Parameter('dwell time with autoscan', datatype=FloatRange(1, 200, unit='s'), readonly=False)
filter = Parameter('filter time', datatype=FloatRange(1, 200), readonly=False, handler=filterhdl) filter = Parameter('filter time', datatype=FloatRange(1, 200, unit='s'), readonly=False)
_trigger_read = False _toggle_autorange = 'init'
_prev_rdgrng = (1, 1) # last read values for icur and exc
_last_range_change = 0
rdgrng_params = 'range', 'iexc', 'vexc'
inset_params = 'enabled', 'pause', 'dwell'
def initModule(self): def communicate(self, command):
super().initModule() return self.switcher.communicate(command)
self._main = self.DISPATCHER.get_module(self.main)
self._main.register_channel(self)
def read_value(self):
if not self.enabled:
self.status = [self.Status.DISABLED, 'disabled']
return Done
if self.channel != self._main.value:
if self.channel == self._main.target:
self._trigger_read = True
return Done
if not self._trigger_read:
return Done
# we got here, when we missed the idle state of self._main
self._trigger_read = False
result = self.communicate('RDGR?%d' % self.channel)
result = float(result)
if self.autorange == 'soft':
now = time.time()
if now > self._last_range_change + self.pause:
rng = int(max(self.minrange, self.range)) # convert from enum to int
if self.status[1] == '':
if abs(result) > self.RES_SCALE[rng]:
if rng < 22:
rng += 1
self.log.info('chan %d: increased range to %.3g' %
(self.channel, self.RES_SCALE[rng]))
else:
lim = 0.2
while rng > self.minrange and abs(result) < lim * self.RES_SCALE[rng]:
rng -= 1
lim -= 0.05 # not more than 4 steps at once
# effectively: <0.16 %: 4 steps, <1%: 3 steps, <5%: 2 steps, <20%: 1 step
if lim != 0.2:
self.log.info('chan %d: lowered range to %.3g' %
(self.channel, self.RES_SCALE[rng]))
elif rng < 22:
rng = min(22, rng + 1)
self.log.info('chan: %d, %s, increased range to %.3g' %
(self.channel, self.status[1], self.RES_SCALE[rng]))
if rng != self.range:
self.write_range(rng)
self._last_range_change = now
return result
def read_status(self): def read_status(self):
if not self.enabled: if not self.enabled:
return [self.Status.DISABLED, 'disabled'] return [self.Status.DISABLED, 'disabled']
if self.channel != self.main.value: if not self.channel == self.switcher.value == self.switcher.target:
return Done return Done
result = int(self.communicate('RDGST?%d' % self.channel)) result = int(self.communicate('RDGST?%d' % self.channel))
result &= 0x37 # mask T_OVER and T_UNDER (change this when implementing temperatures instead of resistivities) result &= 0x37 # mask T_OVER and T_UNDER (change this when implementing temperatures instead of resistivities)
@ -222,63 +193,123 @@ class ResChannel(HasIO, Readable):
return [self.Status.ERROR, statustext] return [self.Status.ERROR, statustext]
return [self.Status.IDLE, ''] return [self.Status.IDLE, '']
def analyze_rdgrng(self, iscur, exc, rng, autorange, excoff): def _read_value(self):
result = dict(range=rng) """read value, without update"""
if autorange: now = time.monotonic()
result['autorange'] = 'hard' if now + 0.5 < max(self._last_range_change, self.switcher._start_switch) + self.pause:
# else: do not change autorange return None
self.log.debug('%s range %r %r %r' % (self.name, rng, autorange, self.autorange)) result = self.communicate('RDGR?%d' % self.channel)
if excoff: result = float(result)
result.update(iexc=0, vexc=0) if self.autorange:
elif iscur: self.fix_autorange()
result.update(iexc=exc, vexc=0) if now + 0.5 > self._last_range_change + self.pause:
else: rng = int(max(self.minrange, self.range)) # convert from enum to int
result.update(iexc=0, vexc=exc) if self.status[1] == '':
if abs(result) > self.RES_SCALE[rng]:
if rng < 22:
rng += 1
else:
lim = 0.2
while rng > self.minrange and abs(result) < lim * self.RES_SCALE[rng]:
rng -= 1
lim -= 0.05 # not more than 4 steps at once
# effectively: <0.16 %: 4 steps, <1%: 3 steps, <5%: 2 steps, <20%: 1 step
elif rng < self.MAX_RNG:
rng = min(self.MAX_RNG, rng + 1)
if rng != self.range:
self.write_range(rng)
self._last_range_change = now
return result return result
def change_rdgrng(self, change): def read_value(self):
iscur, exc, rng, autorange, excoff = change.readValues() if self.channel == self.switcher.value == self.switcher.target:
if change.doesInclude('vexc'): # in case vext is changed, do not consider iexc return self._read_value()
change.iexc = 0 return Done # return previous value
if change.iexc != 0: # we need '!= 0' here, as bool(enum) is always True!
def is_switching(self, now, last_switch, switch_delay):
last_switch = max(last_switch, self._last_range_change)
if now + 0.5 > last_switch + self.pause:
self._read_value() # adjust range only
return super().is_switching(now, last_switch, switch_delay)
@CommonReadHandler(rdgrng_params)
def read_rdgrng(self):
iscur, exc, rng, autorange, excoff = literal_eval(
self.communicate('RDGRNG?%d' % self.channel))
self._prev_rdgrng = iscur, exc
if autorange: # pressed autorange button
if not self._toggle_autorange:
self._toggle_autorange = True
iexc = 0 if excoff or not iscur else exc
vexc = 0 if excoff or iscur else exc
if (rng, iexc, vexc) != (self.range, self.iexc, self.vexc):
self._last_range_change = time.monotonic()
self.range, self.iexc, self.vexc = rng, iexc, vexc
@CommonWriteHandler(rdgrng_params)
def write_rdgrng(self, change):
self.read_range() # make sure autorange is handled
if 'vexc' in change: # in case vext is changed, do not consider iexc
change['iexc'] = 0
if change['iexc'] != 0: # we need '!= 0' here, as bool(enum) is always True!
iscur = 1 iscur = 1
exc = change.iexc exc = change['iexc']
excoff = 0 excoff = 0
elif change.vexc != 0: # we need '!= 0' here, as bool(enum) is always True! elif change['vexc'] != 0: # we need '!= 0' here, as bool(enum) is always True!
iscur = 0 iscur = 0
exc = change.vexc exc = change['vexc']
excoff = 0 excoff = 0
else: else:
iscur, exc = self._prev_rdgrng # set to last read values
excoff = 1 excoff = 1
rng = change.range rng = change['range']
if change.autorange == 'hard': if self.autorange:
autorange = 1 if rng < self.minrange:
else: rng = self.minrange
autorange = 0 self.communicate('RDGRNG %d,%d,%d,%d,%d,%d;*OPC?' % (
if change.autorange == 'soft': self.channel, iscur, exc, rng, 0, excoff))
if rng < self.minrange: self.read_range()
rng = self.minrange
self.autorange = change.autorange
return iscur, exc, rng, autorange, excoff
def analyze_inset(self, on, dwell, pause, curve, tempco): def fix_autorange(self):
return dict(enabled=on, dwell=dwell, pause=pause) if self._toggle_autorange:
if self._toggle_autorange == 'init':
self.write_autorange(True)
else:
self.write_autorange(not self.autorange)
self._toggle_autorange = False
def change_inset(self, change): @CommonReadHandler(inset_params)
_, _, _, curve, tempco = change.readValues() def read_inset(self):
return change.enabled, change.dwell, change.pause, curve, tempco # ignore curve no and temperature coefficient
enabled, dwell, pause, _, _ = literal_eval(
self.communicate('INSET?%d' % self.channel))
self.enabled = enabled
self.dwell = dwell
self.pause = pause
def analyze_filter(self, on, settle, window): @CommonWriteHandler(inset_params)
return dict(filter=settle if on else 0) def write_inset(self, change):
_, _, _, curve, tempco = literal_eval(
self.communicate('INSET?%d' % self.channel))
self.enabled, self.dwell, self.pause, _, _ = literal_eval(
self.communicate('INSET %d,%d,%d,%d,%d,%d;INSET?%d' % (
self.channel, change['enabled'], change['dwell'], change['pause'], curve, tempco,
self.channel)))
if 'enabled' in change and change['enabled']:
# switch to enabled channel
self.switcher.write_target(self.channel)
elif self.switcher.target == self.channel:
self.switcher.set_delays(self)
def change_filter(self, change): def read_filter(self):
_, settle, window = change.readValues() on, settle, _ = literal_eval(self.communicate('FILTER?%d' % self.channel))
if change.filter: return settle if on else 0
return 1, change.filter, 80 # always use 80% filter
return 0, settle, window
def write_enabled(self, value): def write_filter(self, value):
inset.write(self, 'enabled', value) on = 1 if value else 0
if value: value = max(1, value)
self.main.write_target(self.channel) on, settle, _ = literal_eval(self.communicate(
return Done 'FILTER %d,%d,%g,80;FILTER?%d' % (self.channel, on, value, self.channel)))
if not on:
settle = 0
return settle

View File

@ -34,6 +34,7 @@ class Ls370Sim(Communicator):
OTHER_COMMANDS = [ OTHER_COMMANDS = [
('*IDN?', 'LSCI,MODEL370,370184,05302003'), ('*IDN?', 'LSCI,MODEL370,370184,05302003'),
('SCAN?', '3,1'), ('SCAN?', '3,1'),
('*OPC?', '1'),
] ]
def earlyInit(self): def earlyInit(self):