diff --git a/cfg/addons/ls372.cfg b/cfg/addons/ls372.cfg new file mode 100644 index 0000000..81a3896 --- /dev/null +++ b/cfg/addons/ls372.cfg @@ -0,0 +1,22 @@ +[lsc] +description = +class = secop.io.StringIO +wait_before = 0.05 +uri = flamedil-ls:7777 + +[r1] +class = secop_psi.ls372.ResChannel +description = resistivity from LS 372 +switcher = channel +channel = 1 + +[r3] +class = secop_psi.ls372.ResChannel +description = resistivity from LS 372 +switcher = channel +channel = 3 + +[channel] +description = LS 372 channel switcher +class = secop_psi.ls372.Switcher +io = lsc diff --git a/secop_psi/channelswitcher.py b/secop_psi/channelswitcher.py new file mode 100644 index 0000000..cabffc4 --- /dev/null +++ b/secop_psi/channelswitcher.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""channel switcher Mixin + +Example Config File: + +[sw] +description=the switcher for blabla channels +class=frappy_facility.module.YourChannelSwitcher +uri=... + +[chan1] +description=channel 1 +class=frappy_facility.module.YourChannel +switcher=sw + +[chan2] +... +""" + +import time + +from secop.datatypes import IntRange, BoolType, FloatRange +from secop.core import Attached, Property, Drivable, Parameter, Readable, Done + + +class ChannelSwitcher(Drivable): + """base class for the channel switcher + + minimum implementation: + - override :meth:`set_active_channel` + + - .read_value() and .read_status() is called periodically + every .pollinterval on the active channel only + - .is_switching(...) is called every .pollinterval + during switching period, until it is returning False + """ + value = Parameter('the current channel number', IntRange(), needscfg=False) + target = Parameter('channel to select', IntRange(), needscfg=False) + autoscan = Parameter('whether to scan automatically', + BoolType(), readonly=False, default=True) + pollinterval = Parameter(default=1, export=False) + switch_delay = Parameter('the time needed to switch between channels', + FloatRange(0, None), readonly=False, default=5) + measure_delay = Parameter('the time for staying at a channel', + FloatRange(0, None), readonly=False, default=2) + + fast_poll = 0.1 + _channels = None # dict of + _start_measure = 0 + _last_measure = 0 + _start_switch = 0 + _time_tol = 0.5 + + def earlyInit(self): + super().earlyInit() + self._channels = {} + + def register_channel(self, mod): + """register module""" + self._channels[mod.channel] = mod + + def set_active_channel(self, chan): + """tell the HW the active channel + + :param chan: a channel object + + to be implemented + """ + raise NotImplementedError + + def next_channel(self, channelno): + next_channel = channelno + first_channel = None + for ch, mod in self._channels.items(): + if mod.enabled: + if first_channel is None: + first_channel = ch + if next_channel == ch: + next_channel = None + elif next_channel is None: + next_channel = ch + break + else: + next_channel = first_channel + return next_channel + + def read_status(self): + now = time.monotonic() + if self.status[0] == 'BUSY': + chan = self._channels[self.target] + if chan.is_switching(now, self._start_switch, self.switch_delay): + return Done + self.setFastPoll(False) + self.status = 'IDLE', 'measure' + self.value = self.target + self._start_measure = self._last_measure = now + chan.read_value() + chan.read_status() + if self.measure_delay > self._time_tol: + return Done + else: + chan = self._channels[self.value] + self.read_value() # this might modify autoscan or deadline! + if chan.enabled: + if self.target != self.value: # may happen after startup + self.target = self.value + next_measure = self._last_measure + chan.pollinterval + if now + self._time_tol > next_measure: + chan.read_value() + chan.read_status() + self._last_measure = next_measure + if not self.autoscan or now + self._time_tol < self._start_measure + self.measure_delay: + return Done + next_channel = self.next_channel(self.value) + if next_channel == self.value: + return 'IDLE', 'single channel' + if next_channel is None: + return 'ERROR', 'no enabled channel' + self.write_target(next_channel) + return self.status + + def write_pollinterval(self, value): + self._time_tol = min(1, value) * 0.5 + return value + + def write_target(self, channel): + if channel not in self._channels: + raise ValueError('%r is no valid channel' % channel) + if channel == self.target and self._channels[channel].enabled: + return channel + chan = self._channels[channel] + chan.enabled = True + self.set_active_channel(chan) + self._start_switch = time.monotonic() + self.status = 'BUSY', 'change channel' + self.setFastPoll(True, self.fast_poll) + return channel + + +class Channel(Readable): + """base class for channels + + you should override the datatype of the channel property, + in order to match the datatype of the switchers value + """ + switcher = Attached() + channel = Property('channel number', IntRange()) + enabled = Parameter('enabled flag', BoolType(), default=True) + value = Parameter(needscfg=False) + + def initModule(self): + super().initModule() + self.switcher.register_channel(self) + + def doPoll(self): + """value and status are polled by switcher""" + + def is_switching(self, now, last_switch, switch_delay): + """returns True when switching is done""" + return now + self.switcher._time_tol < last_switch + switch_delay diff --git a/secop_psi/ls372.py b/secop_psi/ls372.py new file mode 100644 index 0000000..bbe45a9 --- /dev/null +++ b/secop_psi/ls372.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""LakeShore Model 372 resistance channel + +implements autoscan and autorange by software. +when the autoscan or autorange button is pressed, the state is toggled, +and the hardware mode switched off again. +At startup, the configurable default mode is set, independent of +the hardware state. +""" + +import time + +import secop.io +from secop.datatypes import BoolType, EnumType, FloatRange, IntRange +from secop.lib import formatStatusBits +from secop.core import Done, Drivable, Parameter, Property, CommonReadHandler, CommonWriteHandler +from secop.io import HasIO +from secop_psi.channelswitcher import Channel, ChannelSwitcher + +Status = Drivable.Status + + +STATUS_BIT_LABELS = 'CS_OVL VCM_OVL VMIX_OVL VDIF_OVL R_OVER R_UNDER T_OVER T_UNDER'.split() + + +def parse1(string): + try: + return int(string) + except ValueError: + pass + try: + return float(string) + except ValueError: + return string + + +def parse(reply): + return [parse1(s) for s in reply.split(',')] + + +class StringIO(secop.io.StringIO): + identification = [('*IDN?', 'LSCI,MODEL372,.*')] + wait_before = 0.05 + + +class Switcher(HasIO, ChannelSwitcher): + value = Parameter(datatype=IntRange(1, 16)) + target = Parameter(datatype=IntRange(1, 16)) + use_common_delays = Parameter('use switch_delay and measure_delay instead of the channels pause and dwell', + BoolType(), readonly=False, default=False) + common_pause = Parameter('pause with common delays', FloatRange(3, 200, unit='s'), readonly=False, default=3) + ioClass = StringIO + fast_poll = 1 + _measure_delay = None + _switch_delay = None + + def startModule(self, start_events): + super().startModule(start_events) + # disable unused channels + for ch in range(1, 16): + if ch not in self._channels: + self.communicate('INSET %d,0,0,0,0,0;INSET?%d' % (ch, ch)) + channelno, autoscan = parse(self.communicate('SCAN?')) + if channelno in self._channels and self._channels[channelno].enabled: + if not autoscan: + return # nothing to do + else: + channelno = self.next_channel(channelno) + if channelno is None: + self.status = 'ERROR', 'no enabled channel' + return + self.communicate('SCAN %d,0;SCAN?' % channelno) + + def doPoll(self): + """poll buttons + + and check autorange during filter time + """ + super().doPoll() + self._channels[self.target]._read_value() # check range or read + channelno, autoscan = parse(self.communicate('SCAN?')) + if autoscan: + # pressed autoscan button: switch off HW autoscan and toggle soft autoscan + self.autoscan = not self.autoscan + self.communicate('SCAN %d,0;SCAN?' % self.value) + if channelno != self.value: + # channel changed by keyboard, do not yet return new channel + self.write_target(channelno) + chan = self._channels.get(channelno) + if chan is None: + channelno = self.next_channel(channelno) + if channelno is None: + raise ValueError('no channels enabled') + self.write_target(channelno) + chan = self._channels.get(self.value) + chan.read_autorange() + chan.fix_autorange() # check for toggled autorange button + return Done + + def write_switch_delay(self, value): + self._switch_delay = value + return super().write_switch_delay(value) + + def write_measure_delay(self, value): + self._measure_delay = value + return super().write_measure_delay(value) + + def write_use_common_delays(self, value): + if value: + # use values from a previous change, instead of + # the values from the current channel + if self._measure_delay is not None: + self.measure_delay = self._measure_delay + if self._switch_delay is not None: + self.switch_delay = self._switch_delay + return value + + def set_delays(self, chan): + if self.use_common_delays: + if chan.dwell != self.measure_delay: + chan.write_dwell(self.measure_delay) + if chan.pause != self.common_pause: + chan.write_pause(self.common_pause) + filter_ = max(0, self.switch_delay - self.common_pause) + if chan.filter != filter_: + chan.write_filter(filter_) + else: + # switch_delay and measure_delay is changing with channel + self.switch_delay = chan.pause + chan.filter + self.measure_delay = chan.dwell + + def set_active_channel(self, chan): + self.communicate('SCAN %d,0;SCAN?' % chan.channel) + chan._last_range_change = time.monotonic() + self.set_delays(chan) + + +class ResChannel(Channel): + """temperature channel on Lakeshore 372""" + + RES_RANGE = {key: i+1 for i, key in list( + enumerate(mag % val for mag in ['%gmOhm', '%gOhm', '%gkOhm', '%gMOhm'] + for val in [2, 6.32, 20, 63.2, 200, 632]))[:-2]} + CUR_RANGE = {key: i + 1 for i, key in list( + enumerate(mag % val for mag in ['%gpA', '%gnA', '%guA', '%gmA'] + for val in [1, 3.16, 10, 31.6, 100, 316]))[:-2]} + VOLT_RANGE = {key: i + 1 for i, key in list( + enumerate(mag % val for mag in ['%guV', '%gmV'] + for val in [2, 6.32, 20, 63.2, 200, 632]))} + RES_SCALE = [2 * 10 ** (0.5 * i) for i in range(-7, 16)] # RES_SCALE[0] is not used + MAX_RNG = len(RES_SCALE) - 1 + + channel = Property('the Lakeshore channel', datatype=IntRange(1, 16), export=False) + + value = Parameter(datatype=FloatRange(unit='Ohm')) + pollinterval = Parameter(visibility=3, default=1) + range = Parameter('reading range', readonly=False, + datatype=EnumType(**RES_RANGE)) + minrange = Parameter('minimum range for software autorange', readonly=False, default=1, + datatype=EnumType(**RES_RANGE)) + autorange = Parameter('autorange', datatype=BoolType(), + readonly=False, default=1) + iexc = Parameter('current excitation', datatype=EnumType(off=0, **CUR_RANGE), readonly=False) + vexc = Parameter('voltage excitation', datatype=EnumType(off=0, **VOLT_RANGE), readonly=False) + enabled = Parameter('is this channel enabled?', datatype=BoolType(), readonly=False) + pause = Parameter('pause after channel change', datatype=FloatRange(3, 60, unit='s'), readonly=False) + dwell = Parameter('dwell time with autoscan', datatype=FloatRange(1, 200, unit='s'), readonly=False) + filter = Parameter('filter time', datatype=FloatRange(1, 200, unit='s'), readonly=False) + + _toggle_autorange = 'init' + _prev_rdgrng = (1, 1) # last read values for icur and exc + _last_range_change = 0 + rdgrng_params = 'range', 'iexc', 'vexc' + inset_params = 'enabled', 'pause', 'dwell' + + def communicate(self, command): + return self.switcher.communicate(command) + + def read_status(self): + if not self.enabled: + return [self.Status.DISABLED, 'disabled'] + if not self.channel == self.switcher.value == self.switcher.target: + return Done + result = int(self.communicate('RDGST?%d' % self.channel)) + result &= 0x37 # mask T_OVER and T_UNDER (change this when implementing temperatures instead of resistivities) + statustext = ' '.join(formatStatusBits(result, STATUS_BIT_LABELS)) + if statustext: + return [self.Status.ERROR, statustext] + return [self.Status.IDLE, ''] + + def _read_value(self): + """read value, without update""" + now = time.monotonic() + if now + 0.5 < max(self._last_range_change, self.switcher._start_switch) + self.pause: + return None + result = self.communicate('RDGR?%d' % self.channel) + result = float(result) + if self.autorange: + self.fix_autorange() + if now + 0.5 > self._last_range_change + self.pause: + rng = int(max(self.minrange, self.range)) # convert from enum to int + if self.status[1] == '': + if abs(result) > self.RES_SCALE[rng]: + if rng < 22: + rng += 1 + else: + lim = 0.2 + while rng > self.minrange and abs(result) < lim * self.RES_SCALE[rng]: + rng -= 1 + lim -= 0.05 # not more than 4 steps at once + # effectively: <0.16 %: 4 steps, <1%: 3 steps, <5%: 2 steps, <20%: 1 step + elif rng < self.MAX_RNG: + rng = min(self.MAX_RNG, rng + 1) + if rng != self.range: + self.write_range(rng) + self._last_range_change = now + return result + + def read_value(self): + if self.channel == self.switcher.value == self.switcher.target: + return self._read_value() + return Done # return previous value + + def is_switching(self, now, last_switch, switch_delay): + last_switch = max(last_switch, self._last_range_change) + if now + 0.5 > last_switch + self.pause: + self._read_value() # adjust range only + return super().is_switching(now, last_switch, switch_delay) + + @CommonReadHandler(rdgrng_params) + def read_rdgrng(self): + iscur, exc, rng, autorange, excoff = parse( + self.communicate('RDGRNG?%d' % self.channel)) + self._prev_rdgrng = iscur, exc + if autorange: # pressed autorange button + if not self._toggle_autorange: + self._toggle_autorange = True + iexc = 0 if excoff or not iscur else exc + vexc = 0 if excoff or iscur else exc + if (rng, iexc, vexc) != (self.range, self.iexc, self.vexc): + self._last_range_change = time.monotonic() + self.range, self.iexc, self.vexc = rng, iexc, vexc + + @CommonWriteHandler(rdgrng_params) + def write_rdgrng(self, change): + self.read_range() # make sure autorange is handled + if 'vexc' in change: # in case vext is changed, do not consider iexc + change['iexc'] = 0 + if change['iexc'] != 0: # we need '!= 0' here, as bool(enum) is always True! + iscur = 1 + exc = change['iexc'] + excoff = 0 + elif change['vexc'] != 0: # we need '!= 0' here, as bool(enum) is always True! + iscur = 0 + exc = change['vexc'] + excoff = 0 + else: + iscur, exc = self._prev_rdgrng # set to last read values + excoff = 1 + rng = change['range'] + if self.autorange: + if rng < self.minrange: + rng = self.minrange + self.communicate('RDGRNG %d,%d,%d,%d,%d,%d;*OPC?' % ( + self.channel, iscur, exc, rng, 0, excoff)) + self.read_range() + + def fix_autorange(self): + if self._toggle_autorange: + if self._toggle_autorange == 'init': + self.write_autorange(True) + else: + self.write_autorange(not self.autorange) + self._toggle_autorange = False + + @CommonReadHandler(inset_params) + def read_inset(self): + # ignore curve no and temperature coefficient + enabled, dwell, pause, _, _ = parse( + self.communicate('INSET?%d' % self.channel)) + self.enabled = enabled + self.dwell = dwell + self.pause = pause + + @CommonWriteHandler(inset_params) + def write_inset(self, change): + _, _, _, curve, tempco = parse( + self.communicate('INSET?%d' % self.channel)) + self.enabled, self.dwell, self.pause, _, _ = parse( + self.communicate('INSET %d,%d,%d,%d,%d,%d;INSET?%d' % ( + self.channel, change['enabled'], change['dwell'], change['pause'], curve, tempco, + self.channel))) + if 'enabled' in change and change['enabled']: + # switch to enabled channel + self.switcher.write_target(self.channel) + elif self.switcher.target == self.channel: + self.switcher.set_delays(self) + + def read_filter(self): + on, settle, _ = parse(self.communicate('FILTER?%d' % self.channel)) + return settle if on else 0 + + def write_filter(self, value): + on = 1 if value else 0 + value = max(1, value) + on, settle, _ = parse(self.communicate( + 'FILTER %d,%d,%g,80;FILTER?%d' % (self.channel, on, value, self.channel))) + if not on: + settle = 0 + return settle