improvements on PPMS and LS370

- PPMS: improved machanism for 10 K waiting
- LS370: fixed an issue with auto range
+ LS370: show test for all status bits

Change-Id: Ia6454141917893f0e5c6c4351df3a864942bb629
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/23495
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2020-06-25 12:02:17 +02:00
parent a520e6e1e4
commit aa4c8f1f04
5 changed files with 148 additions and 110 deletions

View File

@ -247,3 +247,14 @@ def getfqdn(name=''):
def getGeneralConfig():
return CONFIG
def formatStatusBits(sword, labels, start=0):
"""Return a list of labels according to bit state in `sword` starting
with bit `start` and the first label in `labels`.
"""
result = []
for i, lbl in enumerate(labels, start):
if sword & (1 << i) and lbl:
result.append(lbl)
return result

View File

@ -27,6 +27,7 @@ from secop.metaclass import Done
from secop.datatypes import FloatRange, IntRange, EnumType, BoolType
from secop.stringio import HasIodev
from secop.poller import Poller, REGULAR
from secop.lib import formatStatusBits
import secop.iohandler
Status = Drivable.Status
@ -49,10 +50,7 @@ filterhdl = IOHandler('filter', 'FILTER?%(channel)d', '%d,%d,%d')
scan = IOHandler('scan', 'SCAN?', '%d,%d')
STATUS_TEXT = {0: ''}
for bit, text in enumerate('CS_OVL VCM_OVL VMIX_OVL R_OVER R_UNDER T_OVER T_UNDER'.split()):
for i in range(1 << bit, 2 << bit):
STATUS_TEXT[i] = text
STATUS_BIT_LABELS = 'CS_OVL VCM_OVL VMIX_OVL VDIF_OVL R_OVER R_UNDER T_OVER T_UNDER'.split()
class StringIO(secop.stringio.StringIO):
@ -177,13 +175,13 @@ class ResChannel(HasIodev, Readable):
return result
def read_status(self):
if self.channel != self._main.channel:
return Done
if not self.enabled:
return [self.Status.DISABLED, 'disabled']
if self.channel != self._main.channel:
return Done
result = int(self.sendRecv('RDGST?%d' % self.channel))
result &= 0x37 # mask T_OVER and T_UNDER (change this when implementing temperatures instead of resistivities)
statustext = STATUS_TEXT[result]
statustext = ' '.join(formatStatusBits(result, STATUS_BIT_LABELS))
if statustext:
return [self.Status.ERROR, statustext]
return [self.Status.IDLE, '']
@ -191,10 +189,11 @@ class ResChannel(HasIodev, Readable):
def analyze_rdgrng(self, iscur, exc, rng, autorange, excoff):
result = dict(range=rng)
if autorange:
result['auotrange'] = 'hard'
elif self.autorange == 'hard':
result['autorange'] = 'soft'
result['autorange'] = 'hard'
#elif self.autorange == 'hard':
# result['autorange'] = 'soft'
# else: do not change autorange
self.log.info('%s range %r %r %r' % (self.name, rng, autorange, self.autorange))
if excoff:
result.update(iexc=0, vexc=0)
elif iscur:
@ -225,6 +224,7 @@ class ResChannel(HasIodev, Readable):
if change.autorange == 'soft':
if rng < self.minrange:
rng = self.minrange
self.autorange = change.autorange
return iscur, exc, rng, autorange, excoff
def analyze_inset(self, on, dwell, pause, curve, tempco):

View File

@ -1,77 +1,71 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""a very simple simulator for a LakeShore Model 370"""
from secop.modules import Communicator
#from secop.lib import mkthread
class Ls370Sim(Communicator):
CHANNEL_COMMANDS = [
('RDGR?%d', '1.0'),
('RDGST?%d', '0'),
('RDGRNG?%d', '0,5,5,0,0'),
('INSET?%d', '1,5,5,0,0'),
('FILTER?%d', '1,5,80'),
]
OTHER_COMMANDS = [
('*IDN?', 'LSCI,MODEL370,370184,05302003'),
('SCAN?', '3,1'),
]
def earlyInit(self):
self._data = dict(self.OTHER_COMMANDS)
for fmt, v in self.CHANNEL_COMMANDS:
for chan in range(1,17):
self._data[fmt % chan] = v
# mkthread(self.run)
def do_communicate(self, command):
# simulation part, time independent
for channel in range(1,17):
_, _, _, _, excoff = self._data['RDGRNG?%d' % channel].split(',')
if excoff == '1':
self._data['RDGST?%d' % channel] = '4'
else:
self._data['RDGST?%d' % channel] = '0'
chunks = command.split(';')
reply = []
for chunk in chunks:
if '?' in chunk:
reply.append(self._data[chunk])
else:
for nqarg in (1,0):
if nqarg == 0:
qcmd, arg = chunk.split(' ', 1)
qcmd += '?'
else:
qcmd, arg = chunk.split(',', nqarg)
qcmd = qcmd.replace(' ', '?', 1)
if qcmd in self._data:
self._data[qcmd] = arg
break
#if command.startswith('R'):
# print('> %s\t< %s' % (command, reply))
return ';'.join(reply)
#def run(self):
# # time dependent simulation
# while True:
# time.sleep(1)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""a very simple simulator for a LakeShore Model 370"""
from secop.modules import Communicator
class Ls370Sim(Communicator):
CHANNEL_COMMANDS = [
('RDGR?%d', '1.0'),
('RDGST?%d', '0'),
('RDGRNG?%d', '0,5,5,0,0'),
('INSET?%d', '1,5,5,0,0'),
('FILTER?%d', '1,5,80'),
]
OTHER_COMMANDS = [
('*IDN?', 'LSCI,MODEL370,370184,05302003'),
('SCAN?', '3,1'),
]
def earlyInit(self):
self._data = dict(self.OTHER_COMMANDS)
for fmt, v in self.CHANNEL_COMMANDS:
for chan in range(1,17):
self._data[fmt % chan] = v
# mkthread(self.run)
def do_communicate(self, command):
# simulation part, time independent
for channel in range(1,17):
_, _, _, _, excoff = self._data['RDGRNG?%d' % channel].split(',')
if excoff == '1':
self._data['RDGST?%d' % channel] = '6'
else:
self._data['RDGST?%d' % channel] = '0'
chunks = command.split(';')
reply = []
for chunk in chunks:
if '?' in chunk:
reply.append(self._data[chunk])
else:
for nqarg in (1,0):
if nqarg == 0:
qcmd, arg = chunk.split(' ', 1)
qcmd += '?'
else:
qcmd, arg = chunk.split(',', nqarg)
qcmd = qcmd.replace(' ', '?', 1)
if qcmd in self._data:
self._data[qcmd] = arg
break
#if command.startswith('R'):
# print('> %s\t< %s' % (command, reply))
return ';'.join(reply)

View File

@ -209,8 +209,18 @@ class UserChannel(Channel):
'no':
Property('channel number',
datatype=IntRange(0, 0), export=False, default=0),
'linkenable':
Property('name of linked channel for enabling',
datatype=StringType(), export=False, default=''),
}
def write_enabled(self, enabled):
other = self._iodev.modules.get(self.linkenable, None)
if other:
other.enabled = enabled
return enabled
class DriverChannel(Channel):
drvout = IOHandler('drvout', 'DRVOUT? %(no)d', '%d,%g,%g')
@ -410,8 +420,11 @@ class Temp(PpmsMixin, Drivable):
Parameter('intermediate set point',
datatype=FloatRange(1.7, 402.0, unit='K'), handler=temp),
'ramp':
Parameter('ramping speed', readonly=False, handler=temp, default=0,
Parameter('ramping speed', readonly=False, default=0,
datatype=FloatRange(0, 20, unit='K/min')),
'workingramp':
Parameter('intermediate ramp value',
datatype=FloatRange(0, 20, unit='K/min'), handler=temp),
'approachmode':
Parameter('how to approach target!', readonly=False, handler=temp,
datatype=EnumType(ApproachMode)),
@ -458,6 +471,7 @@ class Temp(PpmsMixin, Drivable):
general_stop = False
_cool_deadline = 0
_wait_at10 = False
_ramp_at_limit = False
def update_value_status(self, value, packed_status):
"""update value and status"""
@ -469,10 +483,10 @@ class Temp(PpmsMixin, Drivable):
status = self.STATUS_MAP.get(status_code, (self.Status.ERROR, 'unknown status code %d' % status_code))
now = time.time()
if value > 11:
# when starting from T > 40, this will be 15 min.
# when starting from T > 50, this will be 15 min.
# when starting from lower T, it will be less
# when ramping with 2 K/min or less, the deadline is now
self._cool_deadline = max(self._cool_deadline, now + min(30, value - 10) * 30) # 30 sec / K
self._cool_deadline = max(self._cool_deadline, now + min(40, value - 10) * 30) # 30 sec / K
elif self._wait_at10:
if now > self._cool_deadline:
self._wait_at10 = False
@ -506,24 +520,40 @@ class Temp(PpmsMixin, Drivable):
self._expected_target_time = 0
self.status = status
def analyze_temp(self, setpoint, ramp, approachmode):
def analyze_temp(self, setpoint, workingramp, approachmode):
if (setpoint, workingramp, approachmode) == self._last_settings:
# update parameters only on change, as 'ramp' and 'approachmode' are
# not always sent to the hardware
return {}
self._last_settings = setpoint, workingramp, approachmode
if setpoint != 10 or not self._wait_at10:
self.log.debug('read back target %g %r' % (setpoint, self._wait_at10))
self.target = setpoint
result = dict(setpoint=setpoint)
# we update ramp and approachmode only at init
if self.ramp == 0:
result['ramp'] = ramp
result['approachmode'] = approachmode
if workingramp != 2 or not self._ramp_at_limit:
self.log.debug('read back ramp %g %r' % (workingramp, self._ramp_at_limit))
self.ramp = workingramp
result = dict(setpoint=setpoint, workingramp=workingramp)
self.log.debug('analyze_temp %r %r' % (result, (self.target, self.ramp)))
return result
def change_temp(self, change):
if 10 >= self.value > change.setpoint:
ramp = min(2, change.ramp)
print('ramplimit', change.ramp, self.value, ramp)
else:
ramp = change.ramp
self.calc_expected(change.setpoint, ramp)
return change.setpoint, ramp, change.approachmode
ramp = change.ramp
setpoint = change.setpoint
wait_at10 = False
ramp_at_limit = False
if self.value > 11:
if setpoint <= 10:
wait_at10 = True
setpoint = 10
elif self.value > setpoint:
if ramp >= 2:
ramp = 2
ramp_at_limit = True
self._wait_at10 = wait_at10
self._ramp_at_limit = ramp_at_limit
self.calc_expected(setpoint, ramp)
self.log.debug('change_temp v %r s %r r %r w %r l %r' % (self.value, setpoint, ramp, wait_at10, ramp_at_limit))
return setpoint, ramp, change.approachmode
def write_target(self, target):
self._stopped = False
@ -532,24 +562,22 @@ class Temp(PpmsMixin, Drivable):
self._status_before_change = self.status
self.status = (self.Status.BUSY, 'changed target')
self._last_change = time.time()
if self.value > 10 > target and self.ramp > 2:
self._wait_at10 = True
self.temp.write(self, 'setpoint', 10)
else:
self._wait_at10 = False
self.temp.write(self, 'setpoint', target)
self.temp.write(self, 'setpoint', target)
self.log.debug('write_target %s' % repr((self.setpoint, target, self._wait_at10)))
return target
def write_approachmode(self, value):
if self.isDriving():
self.temp.write(self, 'approachmode', value)
return Done
self.approachmode = value
return None # do not execute TEMP command, as this would trigger an unnecessary T change
def write_ramp(self, value):
if self.isDriving():
self.temp.write(self, 'ramp', value)
return Done
# self.ramp = value
return None # do not execute TEMP command, as this would trigger an unnecessary T change
def calc_expected(self, target, ramp):
@ -656,6 +684,7 @@ class Field(PpmsMixin, Drivable):
self.status = status
def analyze_field(self, target, ramp, approachmode, persistentmode):
# print('last_settings tt %s' % repr(self._last_settings))
if (target, ramp, approachmode, persistentmode) == self._last_settings:
# we update parameters only on change, as 'ramp' and 'approachmode' are
# not always sent to the hardware
@ -669,6 +698,7 @@ class Field(PpmsMixin, Drivable):
def write_target(self, target):
if abs(self.target - self.value) <= 2e-5 and target == self.target:
self.target = target
return None # avoid ramping leads
self._status_before_change = self.status
self._stopped = False
@ -679,6 +709,7 @@ class Field(PpmsMixin, Drivable):
def write_persistentmode(self, mode):
if abs(self.target - self.value) <= 2e-5 and mode == self.persistentmode:
self.persistentmode = mode
return None # avoid ramping leads
self._last_change = time.time()
self._status_before_change = self.status
@ -688,6 +719,7 @@ class Field(PpmsMixin, Drivable):
return Done
def write_ramp(self, value):
self.ramp = value
if self.isDriving():
self.field.write(self, 'ramp', value)
return Done
@ -805,6 +837,7 @@ class Position(PpmsMixin, Drivable):
if self.isDriving():
self.move.write(self, 'speed', value)
return Done
self.speed = value
return None # do not execute MOVE command, as this would trigger an unnecessary move
def do_stop(self):

View File

@ -53,7 +53,7 @@ class PpmsSim:
def __init__(self):
self.status = NamedList('t mf ch pos', 1, 1, 1, 1)
self.st = 0x1111
self.t = 200
self.t = 15
self.temp = NamedList('target ramp amode', 200., 1, 0, fast=self.t, delay=10)
self.mf = 100
self.field = NamedList('target ramp amode pmode', 0, 50, 0, 0)