Merge branch 'wip'

Change-Id: I3ba389775ce9b02269ca9c20a42ec1bddf6c5d21
This commit is contained in:
zolliker 2021-03-03 09:46:20 +01:00
commit b4bb172ada
6 changed files with 818 additions and 28 deletions

View File

@ -3,9 +3,57 @@
current running code at SINQ, with newest changes not yet pushed
through the Gerrit workflow at MLZ
## Branches
branches:
- mlz: master from forge.frm2.tum.de:29418/sine2020/secop/playground
- master: the same as above, but with origin: git.psi.ch:sinqdev/frappy.git
- from-mlz: master from forge.frm2.tum.de:29418/sine2020/secop/playground
this is not present at git.psi.ch:sinqdev/frappy.git!
- mlz: keep in sync with from-mlz before pushing (origin git.psi.ch:sinqdev/frappy.git)
- master: the last synced state between mlz and wip/work
(this does NOT contain local repo files only, however, all common files work/mlz should match)
- work: current working version, usually in use on /home/l_samenv/frappy (and on neutron instruments)
this should be a copy of an earlier state of the wip branch
- wip: current test version, usually in use on /home/l_samenv/frappy_wip
master --> mlz # these branches match after a sync step, but they might have a different history
master --> work --> wip
apply commits from mlz to master: (rebase ?) or use cherry-pick:
git cherry-pick <sha1>..<sha2>
where sha1 is the last commit already in wip, and sha2 ist the last commit to be applied
(for a single commit <sha1>.. may be omitted)
the wip branch is also present in an other directory (currently zolliker/switchdrive/gitmlz/frappy),
where commits may be cherry picked for input to Gerrit. As generally in the review process some additonal
changes are done, eventually a sync step should happen:
1) ideally, this is done when work and wip match
1) make copies of branches master, work and wip
2) pull changes from mlz repo to from-mlz branch: git checkout from-mlz; git pull
3) copy to from-mlz to mlz branch: git checkout mlz; git pull; git checkout from-mlz; git checkout -B mlz; git push
4) cherry-pick commits (from mlz) to master (git checkout master; git pull before)
5) copy master branch to work with 'git checkout -B work'.
Not sure if this works, as work is to be pushed to git.psi.ch.
We might first remove the remote branch with 'git push origin --delete work'.
And then create again (git push origin work)?
6) in work: cherry-pick commits not yet feeded into Gerrit from copy in step (1)
7) git checkout -B wip
8) if wip and work did not match: cherry pick changes from wip copy to wip or merge
8) delete branch copies not needed any more
## Procedure to update PPMS
1) git checkout wip (or work, whatever state to copy to ppms)
2) git checkout -B ppms # local branch ?
3) assume PPMSData is mounted on /Volumes/PPMSData
cp -r secop_psi /Volumes/PPMSData/zolliker/frappy/secop_psi
cp -r secop /Volumes/PPMSData/zolliker/frappy/secop
it may be that additional folder have to copied ...

140
secop/historywriter.py Normal file
View File

@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
import time
import frappyhistory # pylint: disable=import-error
from secop.datatypes import get_datatype, IntRange, FloatRange, ScaledInteger,\
EnumType, BoolType, StringType, TupleOf, StructOf
def make_cvt_list(dt, tail=''):
"""create conversion list
list of tuple (<conversion function>, <tail>, <curve options>)
tail is a postfix to be appended in case of tuples and structs
"""
if isinstance(dt, (EnumType, IntRange, BoolType)):
return[(int, tail, dict(type='NUM'))]
if isinstance(dt, (FloatRange, ScaledInteger)):
return [(dt.import_value, tail, dict(type='NUM', unit=dt.unit, period=5) if dt.unit else {})]
if isinstance(dt, StringType):
return [(lambda x: x, tail, dict(type='STR'))]
if isinstance(dt, TupleOf):
items = enumerate(dt.members)
elif isinstance(dt, StructOf):
items = dt.members.items()
else:
return [] # ArrayType, BlobType and TextType are ignored: too much data, probably not used
result = []
for subkey, elmtype in items:
for fun, tail_, opts in make_cvt_list(elmtype, '%s.%s' % (tail, subkey)):
result.append((lambda v, k=subkey, f=fun: f(v[k]), tail_, opts))
return result
class FrappyHistoryWriter(frappyhistory.FrappyWriter):
"""extend writer to be used as an internal frappy connection
API of frappyhistory.FrappyWriter:
:meth:`put_def`(key, opts):
define or overwrite a new curve named <key> with options from dict <opts>
options:
- type:
'NUM' (any number) or 'STR' (text)
remark: tuples and structs create multiple curves
- period:
the typical 'lifetime' of a value.
The intention is, that points in a chart may be connected by a straight line
when the distance is lower than this value. If not, the line should be drawn
horizontally from the last point to a point <period> before the next value.
For example a setpoint should have period 0, which will lead to a stepped
line, whereas for a measured value like a temperature, period should be
slightly bigger than the poll interval. In order to make full use of this,
we would need some additional parameter property.
- show: True/False, whether this curve should be shown or not by default in
a summary chart
- label: a label for the curve in the chart
:meth:`put`(timestamp, key, value)
timestamp: the timestamp. must not decrease!
key: the curve name
value: the value to be stored, converted to a string. '' indicates an undefined value
self.cache is a dict <key> of <value as string>, containing the last used value
"""
def __init__(self, directory, predefined_names, dispatcher):
super().__init__(directory)
self.predefined_names = predefined_names
self.cvt_lists = {} # dict <mod:param> of <conversion list>
self.activated = False
self.dispatcher = dispatcher
self._init_time = None
def init(self, msg):
"""initialize from the 'describing' message"""
action, _, description = msg
assert action == 'describing'
self._init_time = time.time()
for modname, moddesc in description['modules'].items():
for pname, pdesc in moddesc['accessibles'].items():
ident = key = modname + ':' + pname
if pname.startswith('_') and pname[1:] not in self.predefined_names:
key = modname + ':' + pname[1:]
dt = get_datatype(pdesc['datainfo'])
cvt_list = make_cvt_list(dt, key)
for _, hkey, opts in cvt_list:
if pname == 'value':
opts['period'] = opts.get('period', 0)
opts['show'] = True
opts['label'] = modname
elif pname == 'target':
opts['period'] = 0
opts['label'] = modname + '_target'
opts['show'] = True
self.put_def(hkey, opts)
self.cvt_lists[ident] = cvt_list
# self.put(self._init_time, 'STR', 'vars', ' '.join(vars))
self.dispatcher.handle_activate(self, None, None)
self._init_time = None
def send_reply(self, msg):
action, ident, value = msg
if not action.endswith('update'):
print('unknown async message %r' % msg)
return
now = self._init_time or time.time() # on initialisation, use the same timestamp for all
if action == 'update':
for fun, key, _ in self.cvt_lists[ident]:
# we only look at the value, qualifiers are ignored for now
# we do not use the timestamp here, as a potentially decreasing value might
# bring the reader software into trouble
self.put(now, key, str(fun(value[0])))
else: # error_update
for _, key, _ in self.cvt_lists[ident]:
old = self.cache.get(key)
if old is None:
return # ignore if this key is not yet used
self.put(now, key, '')

View File

@ -21,21 +21,20 @@
"""WAVE FUNCTION LECROY XX: SIGNAL GENERATOR"""
from secop.core import Readable, Parameter, FloatRange, \
HasIodev, IntRange, BoolType, EnumType, Module, Property
IntRange, BoolType, EnumType, Module, Property
class Channel(HasIodev, Module):
class Channel(Module):
channel = Property('choose channel to manipulate', IntRange(1, 2))
freq = Parameter('frequency', FloatRange(1e-6, 20e6, unit='Hz'),
poll=True, initwrite=True, default=1000)
amp = Parameter('exc_volt_int', FloatRange(0.00, 5, unit='Vrms'),
poll=True, readonly=False, initwrite=True, default=0.1)
offset = Parameter('offset_volt_int', FloatRange(0.00, 10, unit='V'),
offset = Parameter('offset_volt_int', FloatRange(0.0, 10, unit='V'),
poll=True, readonly=False, initwrite=True, default=0.0)
wave = Parameter('type of wavefunction',
EnumType('WaveFunction', SINE=1, SQUARE=2, RAMP=3, PULSE=4, NOISE=5, ARB=6, DC=7),
poll=True, readonly=False, default='SINE'),
poll=True, readonly=False, default='SINE')
phase = Parameter('signal phase', FloatRange(0, 360, unit='deg'),
poll=True, readonly=False, initwrite=True, default=0)
enabled = Parameter('enable output channel', datatype=EnumType('OnOff', OFF=0, ON=1),
@ -47,7 +46,7 @@ class Channel(HasIodev, Module):
return self.sendRecv('C%d:BSWV FRQ?' % self.channel)
def write_target(self, value):
self.sendRecv('C%d:BSWV FRQ, %g' % (self.channel, str(value)+'Hz'))
self.sendRecv('C%d:BSWV FRQ, %gHz' % (self.channel, value))
return value
# signal wavefunction parameter
@ -87,8 +86,7 @@ class Channel(HasIodev, Module):
return self.sendRecv('C%d:BSWV PHSE?' % self.channel)
def write_phase(self, value):
self.sendRecv('C%d:BSWV PHSE %g' % (self.channel, str(value)))
self.sendRecv('C%d:BSWV PHSE %g' % (self.channel, value))
return value
# dis/enable output channel
@ -104,11 +102,9 @@ class Channel(HasIodev, Module):
class arg(Readable):
pollerClass = None
value = Parameter(datatype=FloatRange(unit=''))
class arg2(Readable):
pollerClass = None
value = Parameter(datatype=BoolType())

View File

@ -18,31 +18,192 @@
# Module authors:
# Daniel Margineda <daniel.margineda@psi.ch>
# *****************************************************************************
"""SIGNAL RECOVERY SR7270: lOCKIN AMPLIFIER FOR AC SUSCEPTIBILITY"""
"""Signal Recovery SR7270: lockin amplifier for AC susceptibility"""
from secop.core import FloatRange, HasIodev, \
Parameter, Readable, StringIO, TupleOf
from secop.core import Readable, Parameter, Command, FloatRange, TupleOf, \
HasIodev, StringIO, Attached, IntRange, BoolType, EnumType
class SR7270(StringIO):
# end_of_line = '\x00' #termination line from maanual page 6.8
end_of_line = '\n'
end_of_line = b'\x00'
def communicate(self, command): # remove dash from terminator
reply = StringIO.communicate(self, command)
status = self._conn.readbytes(2, 0.1) # get the 2 status bytes
return reply + ';%d;%d' % tuple(status)
class XY(HasIodev, Readable):
x = Attached()
y = Attached()
freq_arg = Attached()
amp_arg = Attached()
tc_arg = Attached()
phase_arg = Attached()
dac_arg = Attached()
# parameters required an initial value but initwrite write the default value for polled parameters
value = Parameter('X, Y', datatype=TupleOf(FloatRange(unit='V'), FloatRange(unit='V')))
freq = Parameter('exc_freq_int', FloatRange(0.001,250e3,unit='Hz'), readonly=False, default=100)
freq = Parameter('exc_freq_int',
FloatRange(0.001, 250e3, unit='Hz'),
poll=True, readonly=False, initwrite=True, default=1000)
amp = Parameter('exc_volt_int',
FloatRange(0.00, 5, unit='Vrms'),
poll=True, readonly=False, initwrite=True, default=0.1)
range = Parameter('sensitivity value', FloatRange(0.00, 1, unit='V'), poll=True, default=1)
irange = Parameter('sensitivity index', IntRange(0, 27), poll=True, readonly=False, default=25)
autorange = Parameter('autorange_on', EnumType('autorange', off=0, soft=1, hard=2),
readonly=False, default=0, initwrite=True)
tc = Parameter('time constant value', FloatRange(10e-6, 100, unit='s'), poll=True, default=0.1)
itc = Parameter('time constant index', IntRange(0, 30), poll=True, readonly=False, initwrite=True, default=14)
nm = Parameter('noise mode', BoolType(), readonly=False, default=0)
phase = Parameter('Reference phase control', FloatRange(-360, 360, unit='deg'),
poll=True, readonly=False, initwrite=True, default=0)
vmode = Parameter('Voltage input configuration', IntRange(0, 3), readonly=False, default=3),
# dac = Parameter('output DAC channel value', datatype=TupleOf(IntRange(1, 4), FloatRange(0.0, 5000, unit='mV')),
# poll=True, readonly=False, initwrite=True, default=(3,0))
dac = Parameter('output DAC channel value', FloatRange(-10000, 10000, unit='mV'),
poll=True, readonly=False, initwrite=True, default=0)
iodevClass = SR7270
def comm(self, command):
reply, status, overload = self.sendRecv(command).split(';')
if overload != '0':
self.status = self.Status.WARN, 'overload %s' % overload
else:
self.status = self.Status.IDLE, ''
return reply
def read_value(self):
reply = self.sendRecv('XY.').split('\x00')[-1]
return reply.split(',')
reply = self.comm('XY.').split(',')
x = float(reply[0])
y = float(reply[1])
if self.autorange == 1: # soft
if max(abs(x), abs(y)) >= 0.9*self.range and self.irange < 27:
self.write_irange(self.irange+1)
elif max(abs(x), abs(y)) <= 0.3*self.range and self.irange > 1:
self.write_irange(self.irange-1)
self._x.value = x # to update X,Y classes which will be the collected data.
self._y.value = y
return x, y
def read_freq(self):
reply = self.sendRecv('OF.').split('\x00')[-1]
reply = self.comm('OF.')
return reply
def write_freq(self, value):
self.sendRecv('OF. %g' % value)
self.comm('OF. %g' % value)
return value
def write_autorange(self, value):
if value == 2: # hard
self.comm('AS') # put hardware autorange on
self.comm('AUTOMATIC. 1')
else:
self.comm('AUTOMATIC. 0')
return value
def read_autorange(self):
reply = self.comm('AUTOMATIC')
# determine hardware autorange
if reply == 1: # "hardware auto range is on"
return 2 # hard
if self.autorange == 0: # soft
return self.autorange() # read autorange
return reply # off
# oscillator amplitude module
def read_amp(self):
reply = self.comm('OA.')
return reply
def write_amp(self, value):
self.comm('OA. %g' % value)
return value
# external output DAC
def read_dac(self):
# reply = self.comm('DAC %g' % channel) # failed to add the DAC channel you want to control
reply = self.comm('DAC 3') # stack to channel 3
return reply
def write_dac(self, value):
# self.comm('DAC %g %g' % channel % value)
self.comm('DAC 3 %g' % value)
return value
# sensitivity module
def read_range(self):
reply = self.comm('SEN.')
return reply
def write_irange(self, value):
self.comm('SEN %g' % value)
self.read_range()
return value
def read_irange(self):
reply = self.comm('SEN')
return reply
# time constant module/ noisemode off or 0 allows to use all the time constant range
def read_nm(self):
reply = self.comm('NOISEMODE')
return reply
def write_nm(self, value):
self.comm('NOISEMODE %d' % int(value))
self.read_nm()
return value
def read_tc(self):
reply = self.comm('TC.')
return reply
def write_itc(self, value):
self.comm('TC %g' % value)
self.read_tc()
return value
def read_itc(self):
reply = self.comm('TC')
return reply
# phase and autophase
def read_phase(self):
reply = self.comm('REFP.')
return reply
def write_phase(self, value):
self.comm('REFP %d' % round(1000*value, 0))
self.read_phase()
return value
@Command()
def aphase(self):
"""auto phase"""
self.read_phase()
reply = self.comm('AQN')
self.read_phase()
# voltage input configuration 0:grounded,1=A,2=B,3=A-B
# def read_vmode(self):
# reply = self.comm('VMODE')
# return reply
def write_vmode(self, value):
self.comm('VMODE %d' % value)
# self.read_vmode()
return value
class Comp(Readable):
pollerClass = None
value = Parameter(datatype=FloatRange(unit='V'))
class arg(Readable):
pollerClass = None
value = Parameter(datatype=FloatRange(unit=''))

View File

@ -120,6 +120,11 @@ class Main(HasIodev, Drivable):
self.status = [Status.BUSY, 'switching']
return channel
def write_autoscan(self, value):
scan.send_change(self, self.value, value)
# self.sendRecv('SCAN %d,%d;SCAN?' % (channel, self.autoscan))
return value
class ResChannel(HasIodev, Readable):
"""temperature channel on Lakeshore 336"""
@ -158,16 +163,24 @@ class ResChannel(HasIodev, Readable):
dwell = Parameter('dwell time with autoscan', datatype=FloatRange(1, 200), readonly=False, handler=inset)
filter = Parameter('filter time', datatype=FloatRange(1, 200), readonly=False, handler=filterhdl)
_trigger_read = False
def initModule(self):
self._main = self.DISPATCHER.get_module(self.main)
self._main.register_channel(self)
def read_value(self):
if self.channel != self._main.value:
return Done
if not self.enabled:
self.status = [self.Status.DISABLED, 'disabled']
return Done
if self.channel != self._main.value:
if self.channel == self._main.target:
self._trigger_read = True
return Done
if not self._trigger_read:
return Done
# we got here, when we missed the idle state of self._main
self._trigger_read = False
result = self.sendRecv('RDGR?%d' % self.channel)
result = float(result)
if self.autorange == 'soft':

432
secop_psi/mercury.py Normal file
View File

@ -0,0 +1,432 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""oxford instruments mercury family"""
import math
import re
import time
from secop.core import Drivable, HasIodev, \
Parameter, Property, Readable, StringIO
from secop.datatypes import EnumType, FloatRange, StringType
from secop.errors import HardwareError
class MercuryIO(StringIO):
identification = [('*IDN?', r'IDN:OXFORD INSTRUMENTS:MERCURY*')]
VALUE_UNIT = re.compile(r'(.*\d)([A-Za-z]*)$')
def make_map(**kwds):
"""create a dict converting internal names to values and vice versa"""
kwds.update({v: k for k, v in kwds.items()})
return kwds
MODE_MAP = make_map(OFF=0, ON=1)
SAMPLE_RATE = make_map(OFF=1, ON=0) # invert the codes used by OI
class MercuryChannel(HasIodev):
slots = Property('''slot uids
example: DB6.T1,DB1.H1
slot ids for sensor (and control output)''',
StringType())
channel_name = Parameter('mercury nick name', StringType())
channel_type = '' #: channel type(s) for sensor (and control)
def query(self, adr, value=None):
"""get or set a parameter in mercury syntax
:param adr: for example "TEMP:SIG:TEMP"
:param value: if given and not None, a write command is executed
:return: the value
remark: the DEV:<slot> is added automatically, when adr starts with the channel type
in addition, when addr starts with '0:' or '1:', the channel type is added
"""
for i, (channel_type, slot) in enumerate(zip(self.channel_type.split(','), self.slots.split(','))):
if adr.startswith('%d:' % i):
adr = 'DEV:%s:%s:%s' % (slot, channel_type, adr[2:]) # assume i <= 9
break
if adr.startswith(channel_type + ':'):
adr = 'DEV:%s:%s' % (slot, adr)
break
if value is not None:
try:
value = '%g' % value # this works for float, integers and enums
except ValueError:
value = str(value) # this alone would not work for enums, and not be nice for floats
cmd = 'SET:%s:%s' % (adr, value)
reply = self._iodev.communicate(cmd)
if reply != 'STAT:%s:VALID' % cmd:
raise HardwareError('bad response %r to %r' % (reply, cmd))
# chain a read command anyway
cmd = 'READ:%s' % adr
reply = self._iodev.communicate(cmd)
head, _, result = reply.rpartition(':')
if head != 'STAT:%s' % adr:
raise HardwareError('bad response %r to %r' % (reply, cmd))
match = VALUE_UNIT.match(result)
if match: # result can be interpreted as a float with optional units
return float(match.group(1))
return result
def read_channel_name(self):
return self.query('')
class TemperatureSensor(MercuryChannel, Readable):
channel_type = 'TEMP'
value = Parameter(unit='K')
raw = Parameter('raw value', FloatRange())
def read_value(self):
return self.query('TEMP:SIG:TEMP')
def read_raw(self):
return self.query('TEMP:SIG:RES')
class HasProgressCheck:
"""mixin for progress checks
Implements progress checks based on tolerance, settling time and timeout.
The algorithm does its best to support changes of these parameters on the
fly. However, the full history is not considered, which means for example
that the spent time inside tolerance stored already is not altered when
changing tolerance.
"""
tolerance = Parameter('absolute tolerance', FloatRange(0), readonly=False, default=0)
relative_tolerance = Parameter('_', FloatRange(0, 1), readonly=False, default=0)
settling_time = Parameter(
'''settling time
total amount of time the value has to be within tolerance before switching to idle.
''', FloatRange(0), readonly=False, default=0)
timeout = Parameter(
'''timeout
timeout = 0: disabled, else:
A timeout happens, when the difference value - target is not improved by more than
a factor 2 within timeout.
More precisely, we expect a convergence curve which decreases the difference
by a factor 2 within timeout/2.
If this expected progress is delayed by more than timeout/2, a timeout happens.
If the convergence is better than above, the expected curve is adjusted continuously.
In case the tolerance is reached once, a timeout happens when the time after this is
exceeded by more than settling_time + timeout.
''', FloatRange(0, unit='sec'), readonly=False, default=3600)
status = Parameter('status determined from progress check')
value = Parameter()
target = Parameter()
_settling_start = None # supposed start of settling time (0 when outside)
_first_inside = None # first time within tolerance
_spent_inside = 0 # accumulated settling time
# the upper limit for t0, for the curve timeout_dif * 2 ** -(t - t0)/timeout not touching abs(value(t) - target)
_timeout_base = 0
_timeout_dif = 1
def check_progress(self, value, target):
"""called from read_status
indented to be also be used for alterative implementations of read_status
"""
base = max(abs(target), abs(value))
tol = base * self.relative_tolerance + self.tolerance
if tol == 0:
tol = max(0.01, base * 0.01)
now = time.time()
dif = abs(value - target)
if self._settling_start: # we were inside tol
self._spent_inside = now - self._settling_start
if dif > tol: # transition inside -> outside
self._settling_start = None
else: # we were outside tol
if dif <= tol: # transition outside -> inside
if not self._first_inside:
self._first_inside = now
self._settling_start = now - self._spent_inside
if self._spent_inside > self.settling_time:
return 'IDLE', ''
result = 'BUSY', ('inside tolerance' if self._settling_start else 'outside tolerance')
if self.timeout:
if self._first_inside:
if now > self._first_inside + self.settling_time + self.timeout:
return 'WARNING', 'settling timeout'
return result
tmo2 = self.timeout / 2
def exponential_convergence(t):
return self._timeout_dif * 2 ** -(t - self._timeout_base) / tmo2
if dif < exponential_convergence(now):
# convergence is better than estimated, update expected curve
self._timeout_dif = dif
self._timeout_base = now
elif dif > exponential_convergence(now - tmo2):
return 'WARNING', 'convergence timeout'
return result
def reset_progress(self, value, target):
"""must be called from write_target, whenever the target changes"""
self._settling_start = None
self._first_inside = None
self._spent_inside = 0
self._timeout_base = time.time()
self._timeout_dif = abs(value - target)
def read_status(self):
if self.status[0] == 'IDLE':
# do not change when idle already
return self.status
return self.check_progress(self.value, self.target)
def write_target(self, value):
raise NotImplementedError()
class Loop(HasProgressCheck, MercuryChannel):
"""common base class for loops"""
mode = Parameter('control mode', EnumType(manual=0, pid=1), readonly=False)
prop = Parameter('pid proportional band', FloatRange(), readonly=False)
integ = Parameter('pid integral parameter', FloatRange(unit='min'), readonly=False)
deriv = Parameter('pid differential parameter', FloatRange(unit='min'), readonly=False)
"""pid = Parameter('control parameters', StructOf(p=FloatRange(), i=FloatRange(), d=FloatRange()),readonly=False)"""
pid_table_mode = Parameter('', EnumType(off=0, on=1), readonly=False)
def read_prop(self):
return self.query('0:LOOP:P')
def read_integ(self):
return self.query('0:LOOP:I')
def read_deriv(self):
return self.query('0:LOOP:D')
def write_prop(self, value):
return self.query('0:LOOP:P', value)
def write_integ(self, value):
return self.query('0:LOOP:I', value)
def write_deriv(self, value):
return self.query('0:LOOP:D', value)
def read_enable_pid_table(self):
return self.query('0:LOOP:PIDT').lower()
def write_enable_pid_table(self, value):
return self.query('0:LOOP:PIDT', value.upper()).lower()
def read_mode(self):
return MODE_MAP[self.query('0:LOOP:ENAB')]
def write_mode(self, value):
if value == 'manual':
self.status = 'IDLE', 'manual mode'
elif self.status[0] == 'IDLE':
self.status = 'IDLE', ''
return MODE_MAP[self.query('0:LOOP:ENAB', value)]
def write_target(self, value):
raise NotImplementedError
# def read_pid(self):
# # read all in one go, in order to reduce comm. traffic
# cmd = 'READ:DEV:%s:TEMP:LOOP:P:I:D' % self.slots.split(',')[0]
# reply = self._iodev.communicate(cmd)
# result = list(reply.split(':'))
# pid = result[6::2]
# del result[6::2]
# if ':'.join(result) != cmd:
# raise HardwareError('bad response %r to %r' % (reply, cmd))
# return dict(zip('pid', pid))
#
# def write_pid(self, value):
# # for simplicity use single writes
# return {k: self.query('LOOP:%s' % k.upper(), value[k]) for k in 'pid'}
class TemperatureLoop(Loop, TemperatureSensor, Drivable):
channel_type = 'TEMP,HTR'
heater_limit = Parameter('heater output limit', FloatRange(0, 100, unit='W'), readonly=False)
heater_resistivity = Parameter('heater resistivity', FloatRange(10, 1000, unit='Ohm'), readonly=False)
ramp = Parameter('ramp rate', FloatRange(0, unit='K/min'), readonly=False)
enable_ramp = Parameter('enable ramp rate', EnumType(off=0, on=1), readonly=False)
auto_flow = Parameter('enable auto flow', EnumType(off=0, on=1), readonly=False)
heater_output = Parameter('heater output', FloatRange(0, 100, unit='W'), readonly=False)
def read_heater_limit(self):
return self.query('HTR:VLIM') ** 2 / self.heater_resistivity
def write_heater_limit(self, value):
result = self.query('HTR:VLIM', math.sqrt(value * self.heater_resistivity))
return result ** 2 / self.heater_resistivity
def read_heater_resistivity(self):
value = self.query('HTR:RES')
if value:
return value
return self.heater_resistivity
def write_heater_resistivity(self, value):
return self.query('HTR:RES', value)
def read_enable_ramp(self):
return self.query('TEMP:LOOP:RENA').lower()
def write_enable_ramp(self, value):
return self.query('TEMP:LOOP:RENA', EnumType(off=0, on=1)(value).name).lower()
def read_auto_flow(self):
return self.query('TEMP:LOOP:FAUT').lower()
def write_auto_flow(self, value):
return self.query('TEMP:LOOP:FAUT', EnumType(off=0, on=1)(value).name).lower()
def read_ramp(self):
return self.query('TEMP:LOOP:RSET')
def write_ramp(self, value):
if not value:
self.write_enable_ramp(0)
return 0
if value:
self.write_enable_ramp(1)
return self.query('TEMP:LOOP:RSET', value)
def read_target(self):
# TODO: check about working setpoint
return self.query('TEMP:LOOP:TSET')
def write_target(self, value):
if self.mode != 'pid':
self.log.warning('switch to pid loop mode')
self.write_mode('pid')
self.reset_progress(self.value, value)
return self.query('TEMP:LOOP:TSET', value)
def read_heater_output(self):
# TODO: check that this really works, else next line
return self.query('HTR:SIG:POWR')
# return self.query('HTR:SIG:VOLT') ** 2 / self.heater_resistivity
def write_heater_output(self, value):
if self.mode != 'manual':
self.log.warning('switch to manual heater mode')
self.write_mode('manual')
return self.query('HTR:SIG:VOLT', math.sqrt(value * self.heater_resistivity))
class PressureSensor(MercuryChannel, Readable):
channel_type = 'PRES'
value = Parameter(unit='mbar')
def read_value(self):
return self.query('PRES:SIG:PRES')
class PressureLoop(Loop, PressureSensor, Drivable):
channel_type = 'PRES,AUX'
valve_pos = Parameter('valve position', FloatRange(0, 100, unit='%'), readonly=False)
def read_valve_pos(self):
return self.query('AUX:SIG:PERC')
def write_valve_pos(self, value):
if self.mode != 'manual':
self.log.warning('switch to manual valve mode')
self.write_mode('manual')
return self.query('AUX:SIG:PERC', value)
def write_target(self, value):
self.reset_progress(self.value, value)
return self.query('PRES:LOOP:PRST', value)
class HeLevel(MercuryChannel, Readable):
channel_type = 'LVL'
sample_rate = Parameter('_', EnumType(slow=0, fast=1), readonly=False, poll=True)
hysteresis = Parameter('hysteresis for detection of increase', FloatRange(0, 100, unit='%'), readonly=False)
fast_timeout = Parameter('timeout for switching to slow', FloatRange(0, unit='sec'), readonly=False)
_min_level = 200
_max_level = -100
_last_increase = None # None when in slow mode, last increase time in fast mode
def check_rate(self, sample_rate):
"""check changes in rate
:param sample_rate: (int or enum) 0: slow, 1: fast
initialize affected attributes
"""
if sample_rate != 0: # fast
if not self._last_increase:
self._last_increase = time.time()
self._max_level = -100
elif self._last_increase:
self._last_increase = None
self._min_level = 200
return sample_rate
def read_sample_rate(self):
return self.check_rate(SAMPLE_RATE[self.query('LVL:HEL:PULS:SLOW')])
def write_sample_rate(self, value):
self.check_rate(value)
return SAMPLE_RATE[self.query('LVL:HEL:PULS:SLOW', SAMPLE_RATE[value])]
def read_value(self):
level = self.query('LVL:SIG:HEL:LEV')
# handle automatic switching depending on increase
now = time.time()
if self._last_increase: # fast mode
if level > self._max_level:
self._last_increase = now
self._max_level = level
elif now > self._last_increase + self.fast_timeout:
# no increase since fast timeout -> slow
self.write_sample_rate('slow')
else:
if level > self._min_level + self.hysteresis:
# substantial increase -> fast
self.write_sample_rate('fast')
else:
self._min_level = min(self._min_level, level)
return level
class N2Level(MercuryChannel, Readable):
channel_type = 'LVL'
def read_value(self):
return self.query('LVL:SIG:NIT:LEV')
class MagnetOutput(MercuryChannel, Drivable):
pass