From 4c94580cb9a958dfd6188187eb629c87d0c3cba4 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Tue, 24 May 2022 16:34:10 +0200 Subject: [PATCH] channel switcher for Lakeshore 370 with scanner - add a general channel switcher module - change ls370res code from IOHandler to rwhandlers + fix an issue with the poller when io module is placed below using modules in cfg file after this, IOHandler stuff may be removed from Frappy Change-Id: I787101fc1e365ae3e0453bfe59291e2011a1fe53 Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/28512 Tested-by: Jenkins Automated Tests Reviewed-by: Enrico Faulhaber Reviewed-by: Markus Zolliker --- Makefile | 2 +- cfg/ls370sim.cfg | 29 +-- secop/modules.py | 2 +- secop_psi/channelswitcher.py | 179 +++++++++++++++ secop_psi/ls370res.py | 407 +++++++++++++++++++---------------- secop_psi/ls370sim.py | 1 + 6 files changed, 418 insertions(+), 202 deletions(-) create mode 100644 secop_psi/channelswitcher.py diff --git a/Makefile b/Makefile index 6e4f543..3148ea2 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,7 @@ doc: $(MAKE) -C doc html lint: - pylint -j 0 -f colorized -r n --rcfile=.pylintrc secop secop_* test + pylint -f colorized -r n --rcfile=.pylintrc secop secop_* test isort: @find test -name '*.py' -print0 | xargs -0 isort -e -m 2 -w 80 -ns __init__.py diff --git a/cfg/ls370sim.cfg b/cfg/ls370sim.cfg index e78e366..bb11d01 100644 --- a/cfg/ls370sim.cfg +++ b/cfg/ls370sim.cfg @@ -5,19 +5,24 @@ description = Lsc Simulation at PSI [INTERFACE] uri = tcp://5000 -[res] -class = secop_psi.ls370res.ResChannel -channel = 3 -description = resistivity -main = lsmain -io = lscom - -[lsmain] -class = secop_psi.ls370res.Main -description = main control of Lsc controller -io = lscom - [lscom] class = secop_psi.ls370sim.Ls370Sim description = simulated serial communicator to a LS 370 visibility = 3 + +[sw] +class = secop_psi.ls370res.Switcher +description = channel switcher for Lsc controller +io = lscom + +[a] +class = secop_psi.ls370res.ResChannel +channel = 1 +description = resistivity +switcher = sw + +[b] +class = secop_psi.ls370res.ResChannel +channel = 3 +description = resistivity +switcher = sw diff --git a/secop/modules.py b/secop/modules.py index 48ebfa5..7bca6de 100644 --- a/secop/modules.py +++ b/secop/modules.py @@ -597,7 +597,7 @@ class Module(HasAccessibles): self.io.polledModules.append(self) else: self.triggerPoll = threading.Event() - self.polledModules = [self] + self.polledModules.append(self) def startModule(self, start_events): """runs after init of all modules diff --git a/secop_psi/channelswitcher.py b/secop_psi/channelswitcher.py new file mode 100644 index 0000000..4c3fe57 --- /dev/null +++ b/secop_psi/channelswitcher.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""channel switcher Mixin + +Example Config File: + +[sw] +description=the switcher for blabla channels +class=secop_facility.module.YourChannelSwitcher +uri=... + +[chan1] +description=channel 1 +class=secop_facility.module.YourChannel +switcher=sw + +[chan2] +... +""" + +import time + +from secop.datatypes import IntRange, BoolType, FloatRange +from secop.core import Attached, Property, Drivable, Parameter, Readable, Done + + +class ChannelSwitcher(Drivable): + """base class for the channel switcher + + minimum implementation: + - override :meth:`set_active_channel` + + - .read_value() and .read_status() is called periodically + every .pollinterval on the active channel only + - .is_switching(...) is called every .pollinterval + during switching period, until it is returning False + """ + value = Parameter('the current channel number', IntRange(), needscfg=False) + target = Parameter('channel to select', IntRange(), needscfg=False) + autoscan = Parameter('whether to scan automatically', + BoolType(), readonly=False, default=True) + pollinterval = Parameter(default=1, export=False) + switch_delay = Parameter('the time needed to switch between channels', + FloatRange(0, None), readonly=False, default=5) + measure_delay = Parameter('the time for staying at a channel', + FloatRange(0, None), readonly=False, default=2) + + fast_poll = 0.1 + _channels = None # dict of + _start_measure = 0 + _last_measure = 0 + _start_switch = 0 + _time_tol = 0.5 + + def earlyInit(self): + super().earlyInit() + self._channels = {} + + def register_channel(self, mod): + """register module""" + self._channels[mod.channel] = mod + + def set_active_channel(self, chan): + """tell the HW the active channel + + :param chan: a channel object + + to be implemented + """ + raise NotImplementedError + + def next_channel(self, channelno): + next_channel = channelno + first_channel = None + for ch, mod in self._channels.items(): + if mod.enabled: + if first_channel is None: + first_channel = ch + if next_channel == ch: + next_channel = None + elif next_channel is None: + next_channel = ch + break + else: + next_channel = first_channel + return next_channel + + def read_status(self): + now = time.monotonic() + if self.status[0] == 'BUSY': + chan = self._channels[self.target] + if chan.is_switching(now, self._start_switch, self.switch_delay): + return Done + self.setFastPoll(False) + self.status = 'IDLE', 'measure' + self.value = self.target + self._start_measure = self._last_measure = now + chan.read_value() + chan.read_status() + if self.measure_delay > self._time_tol: + return Done + else: + chan = self._channels[self.value] + self.read_value() # this might modify autoscan or deadline! + if chan.enabled: + if self.target != self.value: # may happen after startup + self.target = self.value + next_measure = self._last_measure + chan.pollinterval + if now + self._time_tol > next_measure: + chan.read_value() + chan.read_status() + self._last_measure = next_measure + if not self.autoscan or now + self._time_tol < self._start_measure + self.measure_delay: + return Done + next_channel = self.next_channel(self.value) + if next_channel == self.value: + return 'IDLE', 'single channel' + if next_channel is None: + return 'ERROR', 'no enabled channel' + self.write_target(next_channel) + return self.status + + def write_pollinterval(self, value): + self._time_tol = min(1, value) * 0.5 + return value + + def write_target(self, channel): + if channel not in self._channels: + raise ValueError('%r is no valid channel' % channel) + if channel == self.target and self._channels[channel].enabled: + return channel + chan = self._channels[channel] + chan.enabled = True + self.set_active_channel(chan) + self._start_switch = time.monotonic() + self.status = 'BUSY', 'change channel' + self.setFastPoll(True, self.fast_poll) + return channel + + +class Channel(Readable): + """base class for channels + + you should override the datatype of the channel property, + in order to match the datatype of the switchers value + """ + switcher = Attached() + channel = Property('channel number', IntRange()) + enabled = Parameter('enabled flag', BoolType(), default=True) + value = Parameter(needscfg=False) + + def initModule(self): + super().initModule() + self.switcher.register_channel(self) + + def doPoll(self): + """value and status are polled by switcher""" + + def is_switching(self, now, last_switch, switch_delay): + """returns True when switching is done""" + return now + self.switcher._time_tol < last_switch + switch_delay diff --git a/secop_psi/ls370res.py b/secop_psi/ls370res.py index 026ac01..eeb4e57 100644 --- a/secop_psi/ls370res.py +++ b/secop_psi/ls370res.py @@ -18,37 +18,28 @@ # Module authors: # Markus Zolliker # ***************************************************************************** -"""LakeShore Model 370 resistance channel""" +"""LakeShore Model 370 resistance channel + +implements autoscan and autorange by software. +when the autoscan or autorange button is pressed, the state is toggled, +and the hardware mode switched off again. +At startup, the configurable default mode is set, independent of +the hardware state. +""" import time +from ast import literal_eval -import secop.iohandler +import secop.io from secop.datatypes import BoolType, EnumType, FloatRange, IntRange from secop.lib import formatStatusBits -from secop.modules import Attached, Done, \ - Drivable, Parameter, Property, Readable +from secop.core import Done, Drivable, Parameter, Property, CommonReadHandler, CommonWriteHandler from secop.io import HasIO +from secop_psi.channelswitcher import Channel, ChannelSwitcher Status = Drivable.Status -class IOHandler(secop.iohandler.IOHandler): - CMDARGS = ['channel'] - CMDSEPARATOR = ';' - - def __init__(self, name, querycmd, replyfmt): - changecmd = querycmd.replace('?', ' ') - if not querycmd.endswith('?'): - changecmd += ',' - super().__init__(name, querycmd, replyfmt, changecmd) - - -rdgrng = IOHandler('rdgrng', 'RDGRNG?%(channel)d', '%d,%d,%d,%d,%d') -inset = IOHandler('inset', 'INSET?%(channel)d', '%d,%d,%d,%d,%d') -filterhdl = IOHandler('filter', 'FILTER?%(channel)d', '%d,%d,%d') -scan = IOHandler('scan', 'SCAN?', '%d,%d') - - STATUS_BIT_LABELS = 'CS_OVL VCM_OVL VMIX_OVL VDIF_OVL R_OVER R_UNDER T_OVER T_UNDER'.split() @@ -57,163 +48,143 @@ class StringIO(secop.io.StringIO): wait_before = 0.05 -class Main(HasIO, Drivable): - - value = Parameter('the current channel', datatype=IntRange(0, 17)) - target = Parameter('channel to select', datatype=IntRange(0, 17)) - autoscan = Parameter('whether to scan automatically', datatype=BoolType(), readonly=False, default=False) - pollinterval = Parameter(default=1, export=False) - +class Switcher(HasIO, ChannelSwitcher): + value = Parameter(datatype=IntRange(1, 16)) + target = Parameter(datatype=IntRange(1, 16)) + use_common_delays = Parameter('use switch_delay and measure_delay instead of the channels pause and dwell', + BoolType(), readonly=False, default=False) + common_pause = Parameter('pause with common delays', FloatRange(3, 200, unit='s'), readonly=False, default=3) ioClass = StringIO - _channel_changed = 0 # time of last channel change - _channels = None # dict of - - def earlyInit(self): - super().earlyInit() - self._channels = {} - - def register_channel(self, modobj): - self._channels[modobj.channel] = modobj + fast_poll = 1 + _measure_delay = None + _switch_delay = None def startModule(self, start_events): super().startModule(start_events) + # disable unused channels for ch in range(1, 16): if ch not in self._channels: self.communicate('INSET %d,0,0,0,0,0;INSET?%d' % (ch, ch)) + channelno, autoscan = literal_eval(self.communicate('SCAN?')) + if channelno in self._channels and self._channels[channelno].enabled: + if not autoscan: + return # nothing to do + else: + channelno = self.next_channel(channelno) + if channelno is None: + self.status = 'ERROR', 'no enabled channel' + return + self.communicate('SCAN %d,0;SCAN?' % channelno) - def read_value(self): - channel, auto = scan.send_command(self) - if channel not in self._channels: - return channel - if not self._channels[channel].enabled: - # channel was disabled recently, but still selected - nextchannel = 0 - for ch, mobj in self._channels.items(): - if mobj.enabled: - if ch > channel: - nextchannel = ch - break - if nextchannel == 0: - nextchannel = ch - if nextchannel: - self.write_target(nextchannel) - return 0 + def doPoll(self): + """poll buttons - now = time.time() - if channel != self.target: - self._channel_changed = now - self.target = channel - self.autoscan = int(auto) - if now < self._channel_changed + self._channels[channel].pause + self._channels[channel].filter: - self.status = [Status.BUSY, 'switching'] - return 0 - self.status = [Status.IDLE, ''] - return channel + and check autorange during filter time + """ + super().doPoll() + self._channels[self.target]._read_value() # check range or read + channelno, autoscan = literal_eval(self.communicate('SCAN?')) + if autoscan: + # pressed autoscan button: switch off HW autoscan and toggle soft autoscan + self.autoscan = not self.autoscan + self.communicate('SCAN %d,0;SCAN?' % self.value) + if channelno != self.value: + # channel changed by keyboard, do not yet return new channel + self.write_target(channelno) + chan = self._channels.get(channelno) + if chan is None: + channelno = self.next_channel(channelno) + if channelno is None: + raise ValueError('no channels enabled') + self.write_target(channelno) + chan = self._channels.get(self.value) + chan.read_autorange() + chan.fix_autorange() # check for toggled autorange button + return Done - def write_target(self, channel): - scan.send_change(self, channel, self.autoscan) - # self.communicate('SCAN %d,%d;SCAN?' % (channel, self.autoscan)) - if channel != self.value: - self.value = 0 - self._channel_changed = time.time() - self.status = [Status.BUSY, 'switching'] - return channel + def write_switch_delay(self, value): + self._switch_delay = value + return super().write_switch_delay(value) - def write_autoscan(self, value): - scan.send_change(self, self.value, value) - # self.communicate('SCAN %d,%d;SCAN?' % (channel, self.autoscan)) + def write_measure_delay(self, value): + self._measure_delay = value + return super().write_measure_delay(value) + + def write_use_common_delays(self, value): + if value: + # use values from a previous change, instead of + # the values from the current channel + if self._measure_delay is not None: + self.measure_delay = self._measure_delay + if self._switch_delay is not None: + self.switch_delay = self._switch_delay return value + def set_delays(self, chan): + if self.use_common_delays: + if chan.dwell != self.measure_delay: + chan.write_dwell(self.measure_delay) + if chan.pause != self.common_pause: + chan.write_pause(self.common_pause) + filter_ = max(0, self.switch_delay - self.common_pause) + if chan.filter != filter_: + chan.write_filter(filter_) + else: + # switch_delay and measure_delay is changing with channel + self.switch_delay = chan.pause + chan.filter + self.measure_delay = chan.dwell -class ResChannel(HasIO, Readable): - """temperature channel on Lakeshore 336""" + def set_active_channel(self, chan): + self.communicate('SCAN %d,0;SCAN?' % chan.channel) + chan._last_range_change = time.monotonic() + self.set_delays(chan) + + +class ResChannel(Channel): + """temperature channel on Lakeshore 370""" RES_RANGE = {key: i+1 for i, key in list( enumerate(mag % val for mag in ['%gmOhm', '%gOhm', '%gkOhm', '%gMOhm'] for val in [2, 6.32, 20, 63.2, 200, 632]))[:-2]} - RES_SCALE = [2 * 10 ** (0.5 * i) for i in range(-7, 16)] # RES_SCALE[0] is not used CUR_RANGE = {key: i + 1 for i, key in list( enumerate(mag % val for mag in ['%gpA', '%gnA', '%guA', '%gmA'] for val in [1, 3.16, 10, 31.6, 100, 316]))[:-2]} VOLT_RANGE = {key: i + 1 for i, key in list( enumerate(mag % val for mag in ['%guV', '%gmV'] for val in [2, 6.32, 20, 63.2, 200, 632]))} - - ioClass = StringIO - _main = None # main module - _last_range_change = 0 # time of last range change + RES_SCALE = [2 * 10 ** (0.5 * i) for i in range(-7, 16)] # RES_SCALE[0] is not used + MAX_RNG = len(RES_SCALE) - 1 channel = Property('the Lakeshore channel', datatype=IntRange(1, 16), export=False) - main = Attached() value = Parameter(datatype=FloatRange(unit='Ohm')) - pollinterval = Parameter(visibility=3, default=1, export=False) + pollinterval = Parameter(visibility=3, default=1) range = Parameter('reading range', readonly=False, - datatype=EnumType(**RES_RANGE), handler=rdgrng) + datatype=EnumType(**RES_RANGE)) minrange = Parameter('minimum range for software autorange', readonly=False, default=1, datatype=EnumType(**RES_RANGE)) - autorange = Parameter('autorange', datatype=EnumType(off=0, hard=1, soft=2), - readonly=False, handler=rdgrng, default=2) - iexc = Parameter('current excitation', datatype=EnumType(off=0, **CUR_RANGE), readonly=False, handler=rdgrng) - vexc = Parameter('voltage excitation', datatype=EnumType(off=0, **VOLT_RANGE), readonly=False, handler=rdgrng) - enabled = Parameter('is this channel enabled?', datatype=BoolType(), readonly=False, handler=inset) - pause = Parameter('pause after channel change', datatype=FloatRange(3, 60), readonly=False, handler=inset) - dwell = Parameter('dwell time with autoscan', datatype=FloatRange(1, 200), readonly=False, handler=inset) - filter = Parameter('filter time', datatype=FloatRange(1, 200), readonly=False, handler=filterhdl) + autorange = Parameter('autorange', datatype=BoolType(), + readonly=False, default=1) + iexc = Parameter('current excitation', datatype=EnumType(off=0, **CUR_RANGE), readonly=False) + vexc = Parameter('voltage excitation', datatype=EnumType(off=0, **VOLT_RANGE), readonly=False) + enabled = Parameter('is this channel enabled?', datatype=BoolType(), readonly=False) + pause = Parameter('pause after channel change', datatype=FloatRange(3, 60, unit='s'), readonly=False) + dwell = Parameter('dwell time with autoscan', datatype=FloatRange(1, 200, unit='s'), readonly=False) + filter = Parameter('filter time', datatype=FloatRange(1, 200, unit='s'), readonly=False) - _trigger_read = False + _toggle_autorange = 'init' + _prev_rdgrng = (1, 1) # last read values for icur and exc + _last_range_change = 0 + rdgrng_params = 'range', 'iexc', 'vexc' + inset_params = 'enabled', 'pause', 'dwell' - def initModule(self): - super().initModule() - self._main = self.DISPATCHER.get_module(self.main) - self._main.register_channel(self) - - def read_value(self): - if not self.enabled: - self.status = [self.Status.DISABLED, 'disabled'] - return Done - if self.channel != self._main.value: - if self.channel == self._main.target: - self._trigger_read = True - return Done - if not self._trigger_read: - return Done - # we got here, when we missed the idle state of self._main - self._trigger_read = False - result = self.communicate('RDGR?%d' % self.channel) - result = float(result) - if self.autorange == 'soft': - now = time.time() - if now > self._last_range_change + self.pause: - rng = int(max(self.minrange, self.range)) # convert from enum to int - if self.status[1] == '': - if abs(result) > self.RES_SCALE[rng]: - if rng < 22: - rng += 1 - self.log.info('chan %d: increased range to %.3g' % - (self.channel, self.RES_SCALE[rng])) - else: - lim = 0.2 - while rng > self.minrange and abs(result) < lim * self.RES_SCALE[rng]: - rng -= 1 - lim -= 0.05 # not more than 4 steps at once - # effectively: <0.16 %: 4 steps, <1%: 3 steps, <5%: 2 steps, <20%: 1 step - if lim != 0.2: - self.log.info('chan %d: lowered range to %.3g' % - (self.channel, self.RES_SCALE[rng])) - elif rng < 22: - rng = min(22, rng + 1) - self.log.info('chan: %d, %s, increased range to %.3g' % - (self.channel, self.status[1], self.RES_SCALE[rng])) - if rng != self.range: - self.write_range(rng) - self._last_range_change = now - return result + def communicate(self, command): + return self.switcher.communicate(command) def read_status(self): if not self.enabled: return [self.Status.DISABLED, 'disabled'] - if self.channel != self.main.value: + if not self.channel == self.switcher.value == self.switcher.target: return Done result = int(self.communicate('RDGST?%d' % self.channel)) result &= 0x37 # mask T_OVER and T_UNDER (change this when implementing temperatures instead of resistivities) @@ -222,63 +193,123 @@ class ResChannel(HasIO, Readable): return [self.Status.ERROR, statustext] return [self.Status.IDLE, ''] - def analyze_rdgrng(self, iscur, exc, rng, autorange, excoff): - result = dict(range=rng) - if autorange: - result['autorange'] = 'hard' - # else: do not change autorange - self.log.debug('%s range %r %r %r' % (self.name, rng, autorange, self.autorange)) - if excoff: - result.update(iexc=0, vexc=0) - elif iscur: - result.update(iexc=exc, vexc=0) - else: - result.update(iexc=0, vexc=exc) + def _read_value(self): + """read value, without update""" + now = time.monotonic() + if now + 0.5 < max(self._last_range_change, self.switcher._start_switch) + self.pause: + return None + result = self.communicate('RDGR?%d' % self.channel) + result = float(result) + if self.autorange: + self.fix_autorange() + if now + 0.5 > self._last_range_change + self.pause: + rng = int(max(self.minrange, self.range)) # convert from enum to int + if self.status[1] == '': + if abs(result) > self.RES_SCALE[rng]: + if rng < 22: + rng += 1 + else: + lim = 0.2 + while rng > self.minrange and abs(result) < lim * self.RES_SCALE[rng]: + rng -= 1 + lim -= 0.05 # not more than 4 steps at once + # effectively: <0.16 %: 4 steps, <1%: 3 steps, <5%: 2 steps, <20%: 1 step + elif rng < self.MAX_RNG: + rng = min(self.MAX_RNG, rng + 1) + if rng != self.range: + self.write_range(rng) + self._last_range_change = now return result - def change_rdgrng(self, change): - iscur, exc, rng, autorange, excoff = change.readValues() - if change.doesInclude('vexc'): # in case vext is changed, do not consider iexc - change.iexc = 0 - if change.iexc != 0: # we need '!= 0' here, as bool(enum) is always True! + def read_value(self): + if self.channel == self.switcher.value == self.switcher.target: + return self._read_value() + return Done # return previous value + + def is_switching(self, now, last_switch, switch_delay): + last_switch = max(last_switch, self._last_range_change) + if now + 0.5 > last_switch + self.pause: + self._read_value() # adjust range only + return super().is_switching(now, last_switch, switch_delay) + + @CommonReadHandler(rdgrng_params) + def read_rdgrng(self): + iscur, exc, rng, autorange, excoff = literal_eval( + self.communicate('RDGRNG?%d' % self.channel)) + self._prev_rdgrng = iscur, exc + if autorange: # pressed autorange button + if not self._toggle_autorange: + self._toggle_autorange = True + iexc = 0 if excoff or not iscur else exc + vexc = 0 if excoff or iscur else exc + if (rng, iexc, vexc) != (self.range, self.iexc, self.vexc): + self._last_range_change = time.monotonic() + self.range, self.iexc, self.vexc = rng, iexc, vexc + + @CommonWriteHandler(rdgrng_params) + def write_rdgrng(self, change): + self.read_range() # make sure autorange is handled + if 'vexc' in change: # in case vext is changed, do not consider iexc + change['iexc'] = 0 + if change['iexc'] != 0: # we need '!= 0' here, as bool(enum) is always True! iscur = 1 - exc = change.iexc + exc = change['iexc'] excoff = 0 - elif change.vexc != 0: # we need '!= 0' here, as bool(enum) is always True! + elif change['vexc'] != 0: # we need '!= 0' here, as bool(enum) is always True! iscur = 0 - exc = change.vexc + exc = change['vexc'] excoff = 0 else: + iscur, exc = self._prev_rdgrng # set to last read values excoff = 1 - rng = change.range - if change.autorange == 'hard': - autorange = 1 - else: - autorange = 0 - if change.autorange == 'soft': - if rng < self.minrange: - rng = self.minrange - self.autorange = change.autorange - return iscur, exc, rng, autorange, excoff + rng = change['range'] + if self.autorange: + if rng < self.minrange: + rng = self.minrange + self.communicate('RDGRNG %d,%d,%d,%d,%d,%d;*OPC?' % ( + self.channel, iscur, exc, rng, 0, excoff)) + self.read_range() - def analyze_inset(self, on, dwell, pause, curve, tempco): - return dict(enabled=on, dwell=dwell, pause=pause) + def fix_autorange(self): + if self._toggle_autorange: + if self._toggle_autorange == 'init': + self.write_autorange(True) + else: + self.write_autorange(not self.autorange) + self._toggle_autorange = False - def change_inset(self, change): - _, _, _, curve, tempco = change.readValues() - return change.enabled, change.dwell, change.pause, curve, tempco + @CommonReadHandler(inset_params) + def read_inset(self): + # ignore curve no and temperature coefficient + enabled, dwell, pause, _, _ = literal_eval( + self.communicate('INSET?%d' % self.channel)) + self.enabled = enabled + self.dwell = dwell + self.pause = pause - def analyze_filter(self, on, settle, window): - return dict(filter=settle if on else 0) + @CommonWriteHandler(inset_params) + def write_inset(self, change): + _, _, _, curve, tempco = literal_eval( + self.communicate('INSET?%d' % self.channel)) + self.enabled, self.dwell, self.pause, _, _ = literal_eval( + self.communicate('INSET %d,%d,%d,%d,%d,%d;INSET?%d' % ( + self.channel, change['enabled'], change['dwell'], change['pause'], curve, tempco, + self.channel))) + if 'enabled' in change and change['enabled']: + # switch to enabled channel + self.switcher.write_target(self.channel) + elif self.switcher.target == self.channel: + self.switcher.set_delays(self) - def change_filter(self, change): - _, settle, window = change.readValues() - if change.filter: - return 1, change.filter, 80 # always use 80% filter - return 0, settle, window + def read_filter(self): + on, settle, _ = literal_eval(self.communicate('FILTER?%d' % self.channel)) + return settle if on else 0 - def write_enabled(self, value): - inset.write(self, 'enabled', value) - if value: - self.main.write_target(self.channel) - return Done + def write_filter(self, value): + on = 1 if value else 0 + value = max(1, value) + on, settle, _ = literal_eval(self.communicate( + 'FILTER %d,%d,%g,80;FILTER?%d' % (self.channel, on, value, self.channel))) + if not on: + settle = 0 + return settle diff --git a/secop_psi/ls370sim.py b/secop_psi/ls370sim.py index 969d566..015f202 100644 --- a/secop_psi/ls370sim.py +++ b/secop_psi/ls370sim.py @@ -34,6 +34,7 @@ class Ls370Sim(Communicator): OTHER_COMMANDS = [ ('*IDN?', 'LSCI,MODEL370,370184,05302003'), ('SCAN?', '3,1'), + ('*OPC?', '1'), ] def earlyInit(self):