diff --git a/README.md b/README.md index fbdaba9..0cf196f 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,57 @@ current running code at SINQ, with newest changes not yet pushed through the Gerrit workflow at MLZ +## Branches + branches: -- mlz: master from forge.frm2.tum.de:29418/sine2020/secop/playground -- master: the same as above, but with origin: git.psi.ch:sinqdev/frappy.git +- from-mlz: master from forge.frm2.tum.de:29418/sine2020/secop/playground + this is not present at git.psi.ch:sinqdev/frappy.git! +- mlz: keep in sync with from-mlz before pushing (origin git.psi.ch:sinqdev/frappy.git) +- master: the last synced state between mlz and wip/work + (this does NOT contain local repo files only, however, all common files work/mlz should match) - work: current working version, usually in use on /home/l_samenv/frappy (and on neutron instruments) + this should be a copy of an earlier state of the wip branch - wip: current test version, usually in use on /home/l_samenv/frappy_wip + + +master --> mlz # these branches match after a sync step, but they might have a different history +master --> work --> wip + +apply commits from mlz to master: (rebase ?) or use cherry-pick: + + git cherry-pick .. + +where sha1 is the last commit already in wip, and sha2 ist the last commit to be applied +(for a single commit .. may be omitted) + +the wip branch is also present in an other directory (currently zolliker/switchdrive/gitmlz/frappy), +where commits may be cherry picked for input to Gerrit. As generally in the review process some additonal +changes are done, eventually a sync step should happen: + +1) ideally, this is done when work and wip match +1) make copies of branches master, work and wip +2) pull changes from mlz repo to from-mlz branch: git checkout from-mlz; git pull +3) copy to from-mlz to mlz branch: git checkout mlz; git pull; git checkout from-mlz; git checkout -B mlz; git push +4) cherry-pick commits (from mlz) to master (git checkout master; git pull before) +5) copy master branch to work with 'git checkout -B work'. + Not sure if this works, as work is to be pushed to git.psi.ch. + We might first remove the remote branch with 'git push origin --delete work'. + And then create again (git push origin work)? +6) in work: cherry-pick commits not yet feeded into Gerrit from copy in step (1) +7) git checkout -B wip +8) if wip and work did not match: cherry pick changes from wip copy to wip or merge +8) delete branch copies not needed any more + + +## Procedure to update PPMS + +1) git checkout wip (or work, whatever state to copy to ppms) +2) git checkout -B ppms # local branch ? +3) assume PPMSData is mounted on /Volumes/PPMSData + + cp -r secop_psi /Volumes/PPMSData/zolliker/frappy/secop_psi + cp -r secop /Volumes/PPMSData/zolliker/frappy/secop + + it may be that additional folder have to copied ... + diff --git a/secop/historywriter.py b/secop/historywriter.py new file mode 100644 index 0000000..76b1bea --- /dev/null +++ b/secop/historywriter.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** + +import time +import frappyhistory # pylint: disable=import-error +from secop.datatypes import get_datatype, IntRange, FloatRange, ScaledInteger,\ + EnumType, BoolType, StringType, TupleOf, StructOf + + +def make_cvt_list(dt, tail=''): + """create conversion list + + list of tuple (, , ) + tail is a postfix to be appended in case of tuples and structs + """ + if isinstance(dt, (EnumType, IntRange, BoolType)): + return[(int, tail, dict(type='NUM'))] + if isinstance(dt, (FloatRange, ScaledInteger)): + return [(dt.import_value, tail, dict(type='NUM', unit=dt.unit, period=5) if dt.unit else {})] + if isinstance(dt, StringType): + return [(lambda x: x, tail, dict(type='STR'))] + if isinstance(dt, TupleOf): + items = enumerate(dt.members) + elif isinstance(dt, StructOf): + items = dt.members.items() + else: + return [] # ArrayType, BlobType and TextType are ignored: too much data, probably not used + result = [] + for subkey, elmtype in items: + for fun, tail_, opts in make_cvt_list(elmtype, '%s.%s' % (tail, subkey)): + result.append((lambda v, k=subkey, f=fun: f(v[k]), tail_, opts)) + return result + + +class FrappyHistoryWriter(frappyhistory.FrappyWriter): + """extend writer to be used as an internal frappy connection + + API of frappyhistory.FrappyWriter: + + :meth:`put_def`(key, opts): + + define or overwrite a new curve named with options from dict + options: + + - type: + 'NUM' (any number) or 'STR' (text) + remark: tuples and structs create multiple curves + - period: + the typical 'lifetime' of a value. + The intention is, that points in a chart may be connected by a straight line + when the distance is lower than this value. If not, the line should be drawn + horizontally from the last point to a point before the next value. + For example a setpoint should have period 0, which will lead to a stepped + line, whereas for a measured value like a temperature, period should be + slightly bigger than the poll interval. In order to make full use of this, + we would need some additional parameter property. + - show: True/False, whether this curve should be shown or not by default in + a summary chart + - label: a label for the curve in the chart + + :meth:`put`(timestamp, key, value) + + timestamp: the timestamp. must not decrease! + key: the curve name + value: the value to be stored, converted to a string. '' indicates an undefined value + + self.cache is a dict of , containing the last used value + """ + def __init__(self, directory, predefined_names, dispatcher): + super().__init__(directory) + self.predefined_names = predefined_names + self.cvt_lists = {} # dict of + self.activated = False + self.dispatcher = dispatcher + self._init_time = None + + def init(self, msg): + """initialize from the 'describing' message""" + action, _, description = msg + assert action == 'describing' + self._init_time = time.time() + + for modname, moddesc in description['modules'].items(): + for pname, pdesc in moddesc['accessibles'].items(): + ident = key = modname + ':' + pname + if pname.startswith('_') and pname[1:] not in self.predefined_names: + key = modname + ':' + pname[1:] + dt = get_datatype(pdesc['datainfo']) + cvt_list = make_cvt_list(dt, key) + for _, hkey, opts in cvt_list: + if pname == 'value': + opts['period'] = opts.get('period', 0) + opts['show'] = True + opts['label'] = modname + elif pname == 'target': + opts['period'] = 0 + opts['label'] = modname + '_target' + opts['show'] = True + self.put_def(hkey, opts) + self.cvt_lists[ident] = cvt_list + # self.put(self._init_time, 'STR', 'vars', ' '.join(vars)) + self.dispatcher.handle_activate(self, None, None) + self._init_time = None + + def send_reply(self, msg): + action, ident, value = msg + if not action.endswith('update'): + print('unknown async message %r' % msg) + return + now = self._init_time or time.time() # on initialisation, use the same timestamp for all + if action == 'update': + for fun, key, _ in self.cvt_lists[ident]: + # we only look at the value, qualifiers are ignored for now + # we do not use the timestamp here, as a potentially decreasing value might + # bring the reader software into trouble + self.put(now, key, str(fun(value[0]))) + + else: # error_update + for _, key, _ in self.cvt_lists[ident]: + old = self.cache.get(key) + if old is None: + return # ignore if this key is not yet used + self.put(now, key, '') diff --git a/secop_psi/FG_Lecroy_3000.py b/secop_psi/FG_Lecroy_3000.py index 3f23adb..875572c 100644 --- a/secop_psi/FG_Lecroy_3000.py +++ b/secop_psi/FG_Lecroy_3000.py @@ -21,21 +21,20 @@ """WAVE FUNCTION LECROY XX: SIGNAL GENERATOR""" from secop.core import Readable, Parameter, FloatRange, \ - HasIodev, IntRange, BoolType, EnumType, Module, Property + IntRange, BoolType, EnumType, Module, Property -class Channel(HasIodev, Module): +class Channel(Module): channel = Property('choose channel to manipulate', IntRange(1, 2)) - freq = Parameter('frequency', FloatRange(1e-6, 20e6, unit='Hz'), poll=True, initwrite=True, default=1000) amp = Parameter('exc_volt_int', FloatRange(0.00, 5, unit='Vrms'), poll=True, readonly=False, initwrite=True, default=0.1) - offset = Parameter('offset_volt_int', FloatRange(0.00, 10, unit='V'), + offset = Parameter('offset_volt_int', FloatRange(0.0, 10, unit='V'), poll=True, readonly=False, initwrite=True, default=0.0) wave = Parameter('type of wavefunction', EnumType('WaveFunction', SINE=1, SQUARE=2, RAMP=3, PULSE=4, NOISE=5, ARB=6, DC=7), - poll=True, readonly=False, default='SINE'), + poll=True, readonly=False, default='SINE') phase = Parameter('signal phase', FloatRange(0, 360, unit='deg'), poll=True, readonly=False, initwrite=True, default=0) enabled = Parameter('enable output channel', datatype=EnumType('OnOff', OFF=0, ON=1), @@ -47,7 +46,7 @@ class Channel(HasIodev, Module): return self.sendRecv('C%d:BSWV FRQ?' % self.channel) def write_target(self, value): - self.sendRecv('C%d:BSWV FRQ, %g' % (self.channel, str(value)+'Hz')) + self.sendRecv('C%d:BSWV FRQ, %gHz' % (self.channel, value)) return value # signal wavefunction parameter @@ -87,8 +86,7 @@ class Channel(HasIodev, Module): return self.sendRecv('C%d:BSWV PHSE?' % self.channel) def write_phase(self, value): - self.sendRecv('C%d:BSWV PHSE %g' % (self.channel, str(value))) - + self.sendRecv('C%d:BSWV PHSE %g' % (self.channel, value)) return value # dis/enable output channel @@ -104,11 +102,9 @@ class Channel(HasIodev, Module): class arg(Readable): pollerClass = None - value = Parameter(datatype=FloatRange(unit='')) class arg2(Readable): pollerClass = None - value = Parameter(datatype=BoolType()) diff --git a/secop_psi/SR_7270.py b/secop_psi/SR_7270.py index 7553b78..0031ff4 100644 --- a/secop_psi/SR_7270.py +++ b/secop_psi/SR_7270.py @@ -18,31 +18,192 @@ # Module authors: # Daniel Margineda # ***************************************************************************** -"""SIGNAL RECOVERY SR7270: lOCKIN AMPLIFIER FOR AC SUSCEPTIBILITY""" +"""Signal Recovery SR7270: lockin amplifier for AC susceptibility""" -from secop.core import FloatRange, HasIodev, \ - Parameter, Readable, StringIO, TupleOf +from secop.core import Readable, Parameter, Command, FloatRange, TupleOf, \ + HasIodev, StringIO, Attached, IntRange, BoolType, EnumType class SR7270(StringIO): - # end_of_line = '\x00' #termination line from maanual page 6.8 - end_of_line = '\n' + end_of_line = b'\x00' + + def communicate(self, command): # remove dash from terminator + reply = StringIO.communicate(self, command) + status = self._conn.readbytes(2, 0.1) # get the 2 status bytes + return reply + ';%d;%d' % tuple(status) class XY(HasIodev, Readable): + x = Attached() + y = Attached() + freq_arg = Attached() + amp_arg = Attached() + tc_arg = Attached() + phase_arg = Attached() + dac_arg = Attached() + + # parameters required an initial value but initwrite write the default value for polled parameters value = Parameter('X, Y', datatype=TupleOf(FloatRange(unit='V'), FloatRange(unit='V'))) - freq = Parameter('exc_freq_int', FloatRange(0.001,250e3,unit='Hz'), readonly=False, default=100) + freq = Parameter('exc_freq_int', + FloatRange(0.001, 250e3, unit='Hz'), + poll=True, readonly=False, initwrite=True, default=1000) + amp = Parameter('exc_volt_int', + FloatRange(0.00, 5, unit='Vrms'), + poll=True, readonly=False, initwrite=True, default=0.1) + range = Parameter('sensitivity value', FloatRange(0.00, 1, unit='V'), poll=True, default=1) + irange = Parameter('sensitivity index', IntRange(0, 27), poll=True, readonly=False, default=25) + autorange = Parameter('autorange_on', EnumType('autorange', off=0, soft=1, hard=2), + readonly=False, default=0, initwrite=True) + tc = Parameter('time constant value', FloatRange(10e-6, 100, unit='s'), poll=True, default=0.1) + itc = Parameter('time constant index', IntRange(0, 30), poll=True, readonly=False, initwrite=True, default=14) + nm = Parameter('noise mode', BoolType(), readonly=False, default=0) + phase = Parameter('Reference phase control', FloatRange(-360, 360, unit='deg'), + poll=True, readonly=False, initwrite=True, default=0) + vmode = Parameter('Voltage input configuration', IntRange(0, 3), readonly=False, default=3), + # dac = Parameter('output DAC channel value', datatype=TupleOf(IntRange(1, 4), FloatRange(0.0, 5000, unit='mV')), + # poll=True, readonly=False, initwrite=True, default=(3,0)) + dac = Parameter('output DAC channel value', FloatRange(-10000, 10000, unit='mV'), + poll=True, readonly=False, initwrite=True, default=0) iodevClass = SR7270 - def read_value(self): - reply = self.sendRecv('XY.').split('\x00')[-1] - return reply.split(',') - - def read_freq(self): - reply = self.sendRecv('OF.').split('\x00')[-1] + def comm(self, command): + reply, status, overload = self.sendRecv(command).split(';') + if overload != '0': + self.status = self.Status.WARN, 'overload %s' % overload + else: + self.status = self.Status.IDLE, '' return reply - def write_freq(self,value): - self.sendRecv('OF. %g' % value) + def read_value(self): + reply = self.comm('XY.').split(',') + x = float(reply[0]) + y = float(reply[1]) + if self.autorange == 1: # soft + if max(abs(x), abs(y)) >= 0.9*self.range and self.irange < 27: + self.write_irange(self.irange+1) + elif max(abs(x), abs(y)) <= 0.3*self.range and self.irange > 1: + self.write_irange(self.irange-1) + self._x.value = x # to update X,Y classes which will be the collected data. + self._y.value = y + return x, y + + def read_freq(self): + reply = self.comm('OF.') + return reply + + def write_freq(self, value): + self.comm('OF. %g' % value) return value + + def write_autorange(self, value): + if value == 2: # hard + self.comm('AS') # put hardware autorange on + self.comm('AUTOMATIC. 1') + else: + self.comm('AUTOMATIC. 0') + return value + + def read_autorange(self): + reply = self.comm('AUTOMATIC') + # determine hardware autorange + if reply == 1: # "hardware auto range is on" + return 2 # hard + if self.autorange == 0: # soft + return self.autorange() # read autorange + return reply # off + + # oscillator amplitude module + def read_amp(self): + reply = self.comm('OA.') + return reply + + def write_amp(self, value): + self.comm('OA. %g' % value) + return value + + # external output DAC + def read_dac(self): + # reply = self.comm('DAC %g' % channel) # failed to add the DAC channel you want to control + reply = self.comm('DAC 3') # stack to channel 3 + return reply + + def write_dac(self, value): + # self.comm('DAC %g %g' % channel % value) + self.comm('DAC 3 %g' % value) + return value + + # sensitivity module + def read_range(self): + reply = self.comm('SEN.') + return reply + + def write_irange(self, value): + self.comm('SEN %g' % value) + self.read_range() + return value + + def read_irange(self): + reply = self.comm('SEN') + return reply + + # time constant module/ noisemode off or 0 allows to use all the time constant range + def read_nm(self): + reply = self.comm('NOISEMODE') + return reply + + def write_nm(self, value): + self.comm('NOISEMODE %d' % int(value)) + self.read_nm() + return value + + def read_tc(self): + reply = self.comm('TC.') + return reply + + def write_itc(self, value): + self.comm('TC %g' % value) + self.read_tc() + return value + + def read_itc(self): + reply = self.comm('TC') + + return reply + + # phase and autophase + def read_phase(self): + reply = self.comm('REFP.') + return reply + + def write_phase(self, value): + self.comm('REFP %d' % round(1000*value, 0)) + self.read_phase() + return value + + @Command() + def aphase(self): + """auto phase""" + self.read_phase() + reply = self.comm('AQN') + self.read_phase() + + # voltage input configuration 0:grounded,1=A,2=B,3=A-B + # def read_vmode(self): + # reply = self.comm('VMODE') + # return reply + + def write_vmode(self, value): + self.comm('VMODE %d' % value) + # self.read_vmode() + return value + + +class Comp(Readable): + pollerClass = None + value = Parameter(datatype=FloatRange(unit='V')) + + +class arg(Readable): + pollerClass = None + value = Parameter(datatype=FloatRange(unit='')) diff --git a/secop_psi/ls370res.py b/secop_psi/ls370res.py index 26786ae..27e1a33 100644 --- a/secop_psi/ls370res.py +++ b/secop_psi/ls370res.py @@ -120,6 +120,11 @@ class Main(HasIodev, Drivable): self.status = [Status.BUSY, 'switching'] return channel + def write_autoscan(self, value): + scan.send_change(self, self.value, value) + # self.sendRecv('SCAN %d,%d;SCAN?' % (channel, self.autoscan)) + return value + class ResChannel(HasIodev, Readable): """temperature channel on Lakeshore 336""" @@ -158,16 +163,24 @@ class ResChannel(HasIodev, Readable): dwell = Parameter('dwell time with autoscan', datatype=FloatRange(1, 200), readonly=False, handler=inset) filter = Parameter('filter time', datatype=FloatRange(1, 200), readonly=False, handler=filterhdl) + _trigger_read = False + def initModule(self): self._main = self.DISPATCHER.get_module(self.main) self._main.register_channel(self) def read_value(self): - if self.channel != self._main.value: - return Done if not self.enabled: self.status = [self.Status.DISABLED, 'disabled'] return Done + if self.channel != self._main.value: + if self.channel == self._main.target: + self._trigger_read = True + return Done + if not self._trigger_read: + return Done + # we got here, when we missed the idle state of self._main + self._trigger_read = False result = self.sendRecv('RDGR?%d' % self.channel) result = float(result) if self.autorange == 'soft': diff --git a/secop_psi/mercury.py b/secop_psi/mercury.py new file mode 100644 index 0000000..0398cb0 --- /dev/null +++ b/secop_psi/mercury.py @@ -0,0 +1,432 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""oxford instruments mercury family""" + + +import math +import re +import time + +from secop.core import Drivable, HasIodev, \ + Parameter, Property, Readable, StringIO +from secop.datatypes import EnumType, FloatRange, StringType +from secop.errors import HardwareError + + +class MercuryIO(StringIO): + identification = [('*IDN?', r'IDN:OXFORD INSTRUMENTS:MERCURY*')] + + +VALUE_UNIT = re.compile(r'(.*\d)([A-Za-z]*)$') + + +def make_map(**kwds): + """create a dict converting internal names to values and vice versa""" + kwds.update({v: k for k, v in kwds.items()}) + return kwds + + +MODE_MAP = make_map(OFF=0, ON=1) +SAMPLE_RATE = make_map(OFF=1, ON=0) # invert the codes used by OI + + +class MercuryChannel(HasIodev): + slots = Property('''slot uids + + example: DB6.T1,DB1.H1 + slot ids for sensor (and control output)''', + StringType()) + channel_name = Parameter('mercury nick name', StringType()) + channel_type = '' #: channel type(s) for sensor (and control) + + def query(self, adr, value=None): + """get or set a parameter in mercury syntax + + :param adr: for example "TEMP:SIG:TEMP" + :param value: if given and not None, a write command is executed + :return: the value + + remark: the DEV: is added automatically, when adr starts with the channel type + in addition, when addr starts with '0:' or '1:', the channel type is added + """ + for i, (channel_type, slot) in enumerate(zip(self.channel_type.split(','), self.slots.split(','))): + if adr.startswith('%d:' % i): + adr = 'DEV:%s:%s:%s' % (slot, channel_type, adr[2:]) # assume i <= 9 + break + if adr.startswith(channel_type + ':'): + adr = 'DEV:%s:%s' % (slot, adr) + break + if value is not None: + try: + value = '%g' % value # this works for float, integers and enums + except ValueError: + value = str(value) # this alone would not work for enums, and not be nice for floats + cmd = 'SET:%s:%s' % (adr, value) + reply = self._iodev.communicate(cmd) + if reply != 'STAT:%s:VALID' % cmd: + raise HardwareError('bad response %r to %r' % (reply, cmd)) + # chain a read command anyway + cmd = 'READ:%s' % adr + reply = self._iodev.communicate(cmd) + head, _, result = reply.rpartition(':') + if head != 'STAT:%s' % adr: + raise HardwareError('bad response %r to %r' % (reply, cmd)) + match = VALUE_UNIT.match(result) + if match: # result can be interpreted as a float with optional units + return float(match.group(1)) + return result + + def read_channel_name(self): + return self.query('') + + +class TemperatureSensor(MercuryChannel, Readable): + channel_type = 'TEMP' + value = Parameter(unit='K') + raw = Parameter('raw value', FloatRange()) + + def read_value(self): + return self.query('TEMP:SIG:TEMP') + + def read_raw(self): + return self.query('TEMP:SIG:RES') + + +class HasProgressCheck: + """mixin for progress checks + + Implements progress checks based on tolerance, settling time and timeout. + The algorithm does its best to support changes of these parameters on the + fly. However, the full history is not considered, which means for example + that the spent time inside tolerance stored already is not altered when + changing tolerance. + """ + tolerance = Parameter('absolute tolerance', FloatRange(0), readonly=False, default=0) + relative_tolerance = Parameter('_', FloatRange(0, 1), readonly=False, default=0) + settling_time = Parameter( + '''settling time + + total amount of time the value has to be within tolerance before switching to idle. + ''', FloatRange(0), readonly=False, default=0) + timeout = Parameter( + '''timeout + + timeout = 0: disabled, else: + A timeout happens, when the difference value - target is not improved by more than + a factor 2 within timeout. + + More precisely, we expect a convergence curve which decreases the difference + by a factor 2 within timeout/2. + If this expected progress is delayed by more than timeout/2, a timeout happens. + If the convergence is better than above, the expected curve is adjusted continuously. + In case the tolerance is reached once, a timeout happens when the time after this is + exceeded by more than settling_time + timeout. + ''', FloatRange(0, unit='sec'), readonly=False, default=3600) + status = Parameter('status determined from progress check') + value = Parameter() + target = Parameter() + + _settling_start = None # supposed start of settling time (0 when outside) + _first_inside = None # first time within tolerance + _spent_inside = 0 # accumulated settling time + # the upper limit for t0, for the curve timeout_dif * 2 ** -(t - t0)/timeout not touching abs(value(t) - target) + _timeout_base = 0 + _timeout_dif = 1 + + def check_progress(self, value, target): + """called from read_status + + indented to be also be used for alterative implementations of read_status + """ + base = max(abs(target), abs(value)) + tol = base * self.relative_tolerance + self.tolerance + if tol == 0: + tol = max(0.01, base * 0.01) + now = time.time() + dif = abs(value - target) + if self._settling_start: # we were inside tol + self._spent_inside = now - self._settling_start + if dif > tol: # transition inside -> outside + self._settling_start = None + else: # we were outside tol + if dif <= tol: # transition outside -> inside + if not self._first_inside: + self._first_inside = now + self._settling_start = now - self._spent_inside + if self._spent_inside > self.settling_time: + return 'IDLE', '' + result = 'BUSY', ('inside tolerance' if self._settling_start else 'outside tolerance') + if self.timeout: + if self._first_inside: + if now > self._first_inside + self.settling_time + self.timeout: + return 'WARNING', 'settling timeout' + return result + tmo2 = self.timeout / 2 + + def exponential_convergence(t): + return self._timeout_dif * 2 ** -(t - self._timeout_base) / tmo2 + + if dif < exponential_convergence(now): + # convergence is better than estimated, update expected curve + self._timeout_dif = dif + self._timeout_base = now + elif dif > exponential_convergence(now - tmo2): + return 'WARNING', 'convergence timeout' + return result + + def reset_progress(self, value, target): + """must be called from write_target, whenever the target changes""" + self._settling_start = None + self._first_inside = None + self._spent_inside = 0 + self._timeout_base = time.time() + self._timeout_dif = abs(value - target) + + def read_status(self): + if self.status[0] == 'IDLE': + # do not change when idle already + return self.status + return self.check_progress(self.value, self.target) + + def write_target(self, value): + raise NotImplementedError() + + +class Loop(HasProgressCheck, MercuryChannel): + """common base class for loops""" + mode = Parameter('control mode', EnumType(manual=0, pid=1), readonly=False) + prop = Parameter('pid proportional band', FloatRange(), readonly=False) + integ = Parameter('pid integral parameter', FloatRange(unit='min'), readonly=False) + deriv = Parameter('pid differential parameter', FloatRange(unit='min'), readonly=False) + """pid = Parameter('control parameters', StructOf(p=FloatRange(), i=FloatRange(), d=FloatRange()),readonly=False)""" + pid_table_mode = Parameter('', EnumType(off=0, on=1), readonly=False) + + def read_prop(self): + return self.query('0:LOOP:P') + + def read_integ(self): + return self.query('0:LOOP:I') + + def read_deriv(self): + return self.query('0:LOOP:D') + + def write_prop(self, value): + return self.query('0:LOOP:P', value) + + def write_integ(self, value): + return self.query('0:LOOP:I', value) + + def write_deriv(self, value): + return self.query('0:LOOP:D', value) + + def read_enable_pid_table(self): + return self.query('0:LOOP:PIDT').lower() + + def write_enable_pid_table(self, value): + return self.query('0:LOOP:PIDT', value.upper()).lower() + + def read_mode(self): + return MODE_MAP[self.query('0:LOOP:ENAB')] + + def write_mode(self, value): + if value == 'manual': + self.status = 'IDLE', 'manual mode' + elif self.status[0] == 'IDLE': + self.status = 'IDLE', '' + return MODE_MAP[self.query('0:LOOP:ENAB', value)] + + def write_target(self, value): + raise NotImplementedError + + # def read_pid(self): + # # read all in one go, in order to reduce comm. traffic + # cmd = 'READ:DEV:%s:TEMP:LOOP:P:I:D' % self.slots.split(',')[0] + # reply = self._iodev.communicate(cmd) + # result = list(reply.split(':')) + # pid = result[6::2] + # del result[6::2] + # if ':'.join(result) != cmd: + # raise HardwareError('bad response %r to %r' % (reply, cmd)) + # return dict(zip('pid', pid)) + # + # def write_pid(self, value): + # # for simplicity use single writes + # return {k: self.query('LOOP:%s' % k.upper(), value[k]) for k in 'pid'} + + +class TemperatureLoop(Loop, TemperatureSensor, Drivable): + channel_type = 'TEMP,HTR' + heater_limit = Parameter('heater output limit', FloatRange(0, 100, unit='W'), readonly=False) + heater_resistivity = Parameter('heater resistivity', FloatRange(10, 1000, unit='Ohm'), readonly=False) + ramp = Parameter('ramp rate', FloatRange(0, unit='K/min'), readonly=False) + enable_ramp = Parameter('enable ramp rate', EnumType(off=0, on=1), readonly=False) + auto_flow = Parameter('enable auto flow', EnumType(off=0, on=1), readonly=False) + heater_output = Parameter('heater output', FloatRange(0, 100, unit='W'), readonly=False) + + def read_heater_limit(self): + return self.query('HTR:VLIM') ** 2 / self.heater_resistivity + + def write_heater_limit(self, value): + result = self.query('HTR:VLIM', math.sqrt(value * self.heater_resistivity)) + return result ** 2 / self.heater_resistivity + + def read_heater_resistivity(self): + value = self.query('HTR:RES') + if value: + return value + return self.heater_resistivity + + def write_heater_resistivity(self, value): + return self.query('HTR:RES', value) + + def read_enable_ramp(self): + return self.query('TEMP:LOOP:RENA').lower() + + def write_enable_ramp(self, value): + return self.query('TEMP:LOOP:RENA', EnumType(off=0, on=1)(value).name).lower() + + def read_auto_flow(self): + return self.query('TEMP:LOOP:FAUT').lower() + + def write_auto_flow(self, value): + return self.query('TEMP:LOOP:FAUT', EnumType(off=0, on=1)(value).name).lower() + + def read_ramp(self): + return self.query('TEMP:LOOP:RSET') + + def write_ramp(self, value): + if not value: + self.write_enable_ramp(0) + return 0 + if value: + self.write_enable_ramp(1) + return self.query('TEMP:LOOP:RSET', value) + + def read_target(self): + # TODO: check about working setpoint + return self.query('TEMP:LOOP:TSET') + + def write_target(self, value): + if self.mode != 'pid': + self.log.warning('switch to pid loop mode') + self.write_mode('pid') + self.reset_progress(self.value, value) + return self.query('TEMP:LOOP:TSET', value) + + def read_heater_output(self): + # TODO: check that this really works, else next line + return self.query('HTR:SIG:POWR') + # return self.query('HTR:SIG:VOLT') ** 2 / self.heater_resistivity + + def write_heater_output(self, value): + if self.mode != 'manual': + self.log.warning('switch to manual heater mode') + self.write_mode('manual') + return self.query('HTR:SIG:VOLT', math.sqrt(value * self.heater_resistivity)) + + +class PressureSensor(MercuryChannel, Readable): + channel_type = 'PRES' + value = Parameter(unit='mbar') + + def read_value(self): + return self.query('PRES:SIG:PRES') + + +class PressureLoop(Loop, PressureSensor, Drivable): + channel_type = 'PRES,AUX' + + valve_pos = Parameter('valve position', FloatRange(0, 100, unit='%'), readonly=False) + + def read_valve_pos(self): + return self.query('AUX:SIG:PERC') + + def write_valve_pos(self, value): + if self.mode != 'manual': + self.log.warning('switch to manual valve mode') + self.write_mode('manual') + return self.query('AUX:SIG:PERC', value) + + def write_target(self, value): + self.reset_progress(self.value, value) + return self.query('PRES:LOOP:PRST', value) + + +class HeLevel(MercuryChannel, Readable): + channel_type = 'LVL' + sample_rate = Parameter('_', EnumType(slow=0, fast=1), readonly=False, poll=True) + hysteresis = Parameter('hysteresis for detection of increase', FloatRange(0, 100, unit='%'), readonly=False) + fast_timeout = Parameter('timeout for switching to slow', FloatRange(0, unit='sec'), readonly=False) + _min_level = 200 + _max_level = -100 + _last_increase = None # None when in slow mode, last increase time in fast mode + + def check_rate(self, sample_rate): + """check changes in rate + + :param sample_rate: (int or enum) 0: slow, 1: fast + initialize affected attributes + """ + if sample_rate != 0: # fast + if not self._last_increase: + self._last_increase = time.time() + self._max_level = -100 + elif self._last_increase: + self._last_increase = None + self._min_level = 200 + return sample_rate + + def read_sample_rate(self): + return self.check_rate(SAMPLE_RATE[self.query('LVL:HEL:PULS:SLOW')]) + + def write_sample_rate(self, value): + self.check_rate(value) + return SAMPLE_RATE[self.query('LVL:HEL:PULS:SLOW', SAMPLE_RATE[value])] + + def read_value(self): + level = self.query('LVL:SIG:HEL:LEV') + # handle automatic switching depending on increase + now = time.time() + if self._last_increase: # fast mode + if level > self._max_level: + self._last_increase = now + self._max_level = level + elif now > self._last_increase + self.fast_timeout: + # no increase since fast timeout -> slow + self.write_sample_rate('slow') + else: + if level > self._min_level + self.hysteresis: + # substantial increase -> fast + self.write_sample_rate('fast') + else: + self._min_level = min(self._min_level, level) + return level + + +class N2Level(MercuryChannel, Readable): + channel_type = 'LVL' + + def read_value(self): + return self.query('LVL:SIG:NIT:LEV') + + +class MagnetOutput(MercuryChannel, Drivable): + pass