improve mercury driver

less fancy but more readable commands

Change-Id: Ifcc6a03199167179d984235c9b1bc7e14c60b51b
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/31008
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2023-05-03 14:50:32 +02:00
parent 74729872a8
commit a9fe8577c3

View File

@ -28,7 +28,7 @@ import time
from frappy.core import Drivable, HasIO, Writable, \
Parameter, Property, Readable, StringIO, Attached, IDLE, nopoll
from frappy.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType, TupleOf
from frappy.errors import HardwareError
from frappy.errors import HardwareError, ProgrammingError, ConfigError
from frappy_psi.convergence import HasConvergence
from frappy.lib.enum import Enum
@ -66,39 +66,43 @@ class IO(StringIO):
class MercuryChannel(HasIO):
slot = Property('''slot uids
example: DB6.T1,DB1.H1
slot ids for sensor (and control output)''',
StringType())
channel_type = '' #: channel type(s) for sensor (and control) e.g. TEMP,HTR or PRES,AUX
slot = Property('comma separated slot id(s), e.g. DB6.T1', StringType())
kind = '' #: used slot kind(s)
slots = () #: dict[<kind>] of <slot>
def earlyInit(self):
super().earlyInit()
self.kinds = self.kind.split(',')
slots = self.slot.split(',')
if len(slots) != len(self.kinds):
raise ConfigError(f'slot needs {len(self.kinds)} comma separated names')
self.slots = dict(zip(self.kinds, slots))
self.setProperty('slot', slots[0])
def _complete_adr(self, adr):
"""complete address from channel_type and slot"""
head, sep, tail = adr.partition(':')
for i, (channel_type, slot) in enumerate(zip(self.channel_type.split(','), self.slot.split(','))):
if head == str(i):
return f'DEV:{slot}:{channel_type}{sep}{tail}'
if head == channel_type:
return f'DEV:{slot}:{head}{sep}{tail}'
"""insert slot to adr"""
spl = adr.split(':')
if spl[0] == 'DEV':
if spl[1] == '':
spl[1] = self.slots[spl[2]]
return ':'.join(spl)
elif spl[0] != 'SYS':
raise ProgrammingError('using old style adr?')
return adr
def multiquery(self, adr, names=(), convert=as_float, debug=None):
"""get parameter(s) in mercury syntax
:param adr: the 'address part' of the SCPI command
the DEV:<slot> is added automatically, when adr starts with the channel type
in addition, when adr starts with '0:' or '1:', channel type and slot are added
READ: is added automatically, slot is inserted when adr starts with DEV::
:param names: the SCPI names of the parameter(s), for example ['TEMP']
:param convert: a converter function (converts replied string to value)
:return: the values as tuple
Example:
adr='AUX:SIG'
Example (kind=PRES,AUX slot=DB5.P1,DB3.G1):
adr='DEV::AUX:SIG'
names = ('PERC',)
self.channel_type='PRES,AUX' # adr starts with 'AUX'
self.slot='DB5.P1,DB3.G1' # -> take second slot
-> query command will be READ:DEV:DB3.G1:PRES:SIG:PERC
-> query command will be READ:DEV:DB3.G1:AUX:SIG:PERC
"""
# TODO: if the need arises: allow convert to be a list
adr = self._complete_adr(adr)
@ -125,18 +129,16 @@ class MercuryChannel(HasIO):
def multichange(self, adr, values, convert=as_float, tolerance=0, n_retry=3):
"""set parameter(s) in mercury syntax
:param adr: as in see multiquery method
:param adr: as in multiquery method. SET: is added automatically
:param values: [(name1, value1), (name2, value2) ...]
:param convert: a converter function (converts given value to string and replied string to value)
:param tolerance: tolerance for readback check
:param n_retry: number of retries or 0 for no readback check
:return: the values as tuple
Example:
adr='0:LOOP'
Example (kind=TEMP, slot=DB6.T1:
adr='DEV::TEMP:LOOP'
values = [('P', 5), ('I', 2), ('D', 0)]
self.channel_type='TEMP,HTR' # adr starts with 0: take TEMP
self.slot='DB6.T1,DB1.H1' # and take first slot
-> change command will be SET:DEV:DB6.T1:TEMP:LOOP:P:5:I:2:D:0
"""
# TODO: if the need arises: allow convert and or tolerance to be a list
@ -178,7 +180,7 @@ class MercuryChannel(HasIO):
def query(self, adr, convert=as_float):
"""query a single parameter
'adr' and 'convert' areg
'adr' and 'convert' as in multiquery
"""
adr, _, name = adr.rpartition(':')
return self.multiquery(adr, [name], convert)[0]
@ -189,15 +191,15 @@ class MercuryChannel(HasIO):
class TemperatureSensor(MercuryChannel, Readable):
channel_type = 'TEMP'
kind = 'TEMP'
value = Parameter(unit='K')
raw = Parameter('raw value', FloatRange(unit='Ohm'))
def read_value(self):
return self.query('TEMP:SIG:TEMP')
return self.query('DEV::TEMP:SIG:TEMP')
def read_raw(self):
return self.query('TEMP:SIG:RES')
return self.query('DEV::TEMP:SIG:RES')
class HasInput(MercuryChannel):
@ -270,18 +272,18 @@ class Loop(HasConvergence, MercuryChannel, Drivable):
self.start_state()
def read_enable_pid_table(self):
return self.query('0:LOOP:PIDT', off_on)
return self.query(f'DEV::{self.kinds[0]}:LOOP:PIDT', off_on)
def write_enable_pid_table(self, value):
return self.change('0:LOOP:PIDT', value, off_on)
return self.change(f'DEV::{self.kinds[0]}:LOOP:PIDT', value, off_on)
def read_ctrlpars(self):
# read all in one go, in order to reduce comm. traffic
pid = self.multiquery('0:LOOP', ('P', 'I', 'D'))
pid = self.multiquery(f'DEV::{self.kinds[0]}:LOOP', ('P', 'I', 'D'))
return {k: float(v) for k, v in zip('pid', pid)}
def write_ctrlpars(self, value):
pid = self.multichange('0:LOOP', [(k, value[k.lower()]) for k in 'PID'])
pid = self.multichange(f'DEV::{self.kinds[0]}:LOOP', [(k, value[k.lower()]) for k in 'PID'])
return {k.lower(): v for k, v in zip('PID', pid)}
@ -293,7 +295,7 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
resistivity. As the measured heater current is available, the resistivity
will be adjusted automatically, when true_power is True.
"""
channel_type = 'HTR'
kind = 'HTR'
value = Parameter('heater output', FloatRange(unit='W'), readonly=False)
status = Parameter(update_unchanged='never')
target = Parameter('heater output', FloatRange(0, 100, unit='$'), readonly=False, update_unchanged='never')
@ -306,23 +308,23 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
_volt_target = None
def read_limit(self):
return self.query('HTR:VLIM') ** 2 / self.resistivity
return self.query('DEV::HTR:VLIM') ** 2 / self.resistivity
def write_limit(self, value):
result = self.change('HTR:VLIM', math.sqrt(value * self.resistivity))
result = self.change('DEV::HTR:VLIM', math.sqrt(value * self.resistivity))
return result ** 2 / self.resistivity
def read_resistivity(self):
if self.true_power:
return self.resistivity
return max(10, self.query('HTR:RES'))
return max(10.0, self.query('DEV::HTR:RES'))
def write_resistivity(self, value):
self.resistivity = self.change('HTR:RES', max(10, value))
self.resistivity = self.change('DEV::HTR:RES', max(10.0, value))
if self._last_target is not None:
if not self.true_power:
self._volt_target = math.sqrt(self._last_target * self.resistivity)
self.change('HTR:SIG:VOLT', self._volt_target)
self.change('DEV::HTR:SIG:VOLT', self._volt_target)
return self.resistivity
def read_status(self):
@ -332,9 +334,9 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
if self._last_target is None: # on init
self.read_target()
if not self.true_power:
volt = self.query('HTR:SIG:VOLT')
return volt ** 2 / max(10, self.resistivity)
volt, current = self.multiquery('HTR:SIG', ('VOLT', 'CURR'))
volt = self.query('DEV::HTR:SIG:VOLT')
return volt ** 2 / max(10.0, self.resistivity)
volt, current = self.multiquery('DEV::HTR:SIG', ('VOLT', 'CURR'))
if volt > 0 and current > 0.0001 and self._last_target:
res = volt / current
tol = res * max(max(0.0003, abs(volt - self._volt_target)) / volt, 0.0001 / current, 0.0001)
@ -342,16 +344,16 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
self.write_resistivity(round(res, 1))
if self.controlled_by == 0:
self._volt_target = math.sqrt(self._last_target * self.resistivity)
self.change('HTR:SIG:VOLT', self._volt_target)
self.change('DEV::HTR:SIG:VOLT', self._volt_target)
return volt * current
def read_target(self):
if self.controlled_by != 0 or self._last_target is not None:
# read back only when not yet initialized
return self.target
self._volt_target = self.query('HTR:SIG:VOLT')
self.resistivity = max(10, self.query('HTR:RES'))
self._last_target = self._volt_target ** 2 / max(10, self.resistivity)
self._volt_target = self.query('DEV::HTR:SIG:VOLT')
self.resistivity = max(10.0, self.query('DEV::HTR:RES'))
self._last_target = self._volt_target ** 2 / max(10.0, self.resistivity)
return self._last_target
def set_target(self, target):
@ -360,7 +362,7 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
might be used by a software loop
"""
self._volt_target = math.sqrt(target * self.resistivity)
self.change('HTR:SIG:VOLT', self._volt_target)
self.change('DEV::HTR:SIG:VOLT', self._volt_target)
self._last_target = target
return target
@ -370,13 +372,14 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
class TemperatureLoop(TemperatureSensor, Loop, Drivable):
channel_type = 'TEMP'
kind = 'TEMP'
output_module = Attached(HasInput, mandatory=False)
ramp = Parameter('ramp rate', FloatRange(0, unit='$/min'), readonly=False)
enable_ramp = Parameter('enable ramp rate', BoolType(), readonly=False)
setpoint = Parameter('working setpoint (differs from target when ramping)', FloatRange(0, unit='$'))
tolerance = Parameter(default=0.1)
_last_setpoint_change = None
# overridden in subclass frappy_psi.triton.TemperatureLoop
ENABLE = 'TEMP:LOOP:ENAB'
ENABLE_RAMP = 'TEMP:LOOP:RENA'
RAMP_RATE = 'TEMP:LOOP:RSET'
@ -386,23 +389,23 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
self.read_setpoint()
def read_control_active(self):
active = self.query(self.ENABLE, off_on)
active = self.query(f'DEV::{self.ENABLE}', off_on)
self.set_output(active)
return active
def write_control_active(self, value):
self.set_output(value)
return self.change(self.ENABLE, value, off_on)
return self.change(f'DEV::{self.ENABLE}', value, off_on)
@nopoll # polled by read_setpoint
def read_target(self):
if self.read_enable_ramp():
return self.target
self.setpoint = self.query('TEMP:LOOP:TSET')
self.setpoint = self.query('DEV::TEMP:LOOP:TSET')
return self.setpoint
def read_setpoint(self):
setpoint = self.query('TEMP:LOOP:TSET')
setpoint = self.query('DEV::TEMP:LOOP:TSET')
if self.enable_ramp:
if setpoint == self.setpoint:
# update target when working setpoint does no longer change
@ -417,7 +420,7 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
return setpoint
def write_target(self, value):
target = self.change('TEMP:LOOP:TSET', value)
target = self.change('DEV::TEMP:LOOP:TSET', value)
if self.enable_ramp:
self._last_setpoint_change = None
self.set_target(value)
@ -426,10 +429,10 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
return self.target
def read_enable_ramp(self):
return self.query(self.ENABLE_RAMP, off_on)
return self.query(f'DEV::{self.ENABLE_RAMP}', off_on)
def write_enable_ramp(self, value):
return self.change(self.ENABLE_RAMP, value, off_on)
return self.change(f'DEV::{self.ENABLE_RAMP}', value, off_on)
def set_output(self, active):
if active:
@ -443,7 +446,7 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
self.status = status
def read_ramp(self):
result = self.query(self.RAMP_RATE)
result = self.query(f'DEV::{self.RAMP_RATE}')
return min(9e99, result)
def write_ramp(self, value):
@ -452,23 +455,23 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
self.write_enable_ramp(0)
return 0
if value >= 9e99:
self.change(self.RAMP_RATE, 'inf', as_string)
self.change(f'DEV::{self.RAMP_RATE}', 'inf', as_string)
self.write_enable_ramp(0)
return 9e99
self.write_enable_ramp(1)
return self.change(self.RAMP_RATE, max(1e-4, value))
return self.change(f'DEV::{self.RAMP_RATE}', max(1e-4, value))
class PressureSensor(MercuryChannel, Readable):
channel_type = 'PRES'
kind = 'PRES'
value = Parameter(unit='mbar')
def read_value(self):
return self.query('PRES:SIG:PRES')
return self.query('DEV::PRES:SIG:PRES')
class ValvePos(HasInput, MercuryChannel, Drivable):
channel_type = 'PRES,AUX'
kind = 'PRES,AUX'
value = Parameter('value pos', FloatRange(unit='%'), readonly=False)
target = Parameter('valve pos target', FloatRange(0, 100, unit='$'), readonly=False)
@ -476,7 +479,7 @@ class ValvePos(HasInput, MercuryChannel, Drivable):
self.read_status()
def read_value(self):
return self.query('AUX:SIG:PERC')
return self.query(f'DEV:{self.slots["AUX"]}:AUX:SIG:PERC')
def read_status(self):
self.read_value()
@ -485,36 +488,36 @@ class ValvePos(HasInput, MercuryChannel, Drivable):
return 'BUSY', 'moving'
def read_target(self):
return self.query('PRES:LOOP:FSET')
return self.query('DEV::PRES:LOOP:FSET')
def write_target(self, value):
self.write_controlled_by(SELF)
return self.change('PRES:LOOP:FSET', value)
return self.change('DEV::PRES:LOOP:FSET', value)
class PressureLoop(HasInput, PressureSensor, Loop, Drivable):
channel_type = 'PRES'
kind = 'PRES'
output_module = Attached(ValvePos, mandatory=False)
tolerance = Parameter(default=0.1)
def read_control_active(self):
active = self.query('PRES:LOOP:FAUT', off_on)
active = self.query('DEV::PRES:LOOP:FAUT', off_on)
self.set_output(active)
return active
def write_control_active(self, value):
self.set_output(value)
return self.change('PRES:LOOP:FAUT', value, off_on)
return self.change('DEV::PRES:LOOP:FAUT', value, off_on)
def read_target(self):
return self.query('PRES:LOOP:PRST')
return self.query('DEV::PRES:LOOP:PRST')
def set_target(self, target):
"""set the target without switching to manual
might be used by a software loop
"""
self.change('PRES:LOOP:PRST', target)
self.change('DEV::PRES:LOOP:PRST', target)
super().set_target(target)
def write_target(self, value):
@ -571,7 +574,7 @@ class HeLevel(MercuryChannel, Readable):
The Mercury system does not support automatic switching between fast
(when filling) and slow (when consuming). We have to handle this by software.
"""
channel_type = 'LVL'
kind = 'LVL'
value = Parameter(unit='%')
sample_rate = Parameter('_', EnumType(slow=0, fast=1), readonly=False)
hysteresis = Parameter('hysteresis for detection of increase', FloatRange(0, 100, unit='%'),
@ -598,14 +601,14 @@ class HeLevel(MercuryChannel, Readable):
return sample_rate
def read_sample_rate(self):
return self.check_rate(self.query('LVL:HEL:PULS:SLOW', fast_slow))
return self.check_rate(self.query('DEV::LVL:HEL:PULS:SLOW', fast_slow))
def write_sample_rate(self, value):
self.check_rate(value)
return self.change('LVL:HEL:PULS:SLOW', value, fast_slow)
return self.change('DEV::LVL:HEL:PULS:SLOW', value, fast_slow)
def read_value(self):
level = self.query('LVL:SIG:HEL:LEV')
level = self.query('DEV::LVL:SIG:HEL:LEV')
# handle automatic switching depending on increase
now = time.time()
if self._last_increase: # fast mode
@ -625,8 +628,8 @@ class HeLevel(MercuryChannel, Readable):
class N2Level(MercuryChannel, Readable):
channel_type = 'LVL'
kind = 'LVL'
value = Parameter(unit='%')
def read_value(self):
return self.query('LVL:SIG:NIT:LEV')
return self.query('DEV::LVL:SIG:NIT:LEV')