improve mercury temperature loop

- remove appearance of Done
- add auto flow
- try up to 3 times in 'change' method if read back does not match

Change-Id: I98928307bda87190d34aed663023b157311d4495
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/30981
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2023-05-01 11:26:52 +02:00
parent 6c02f37bbb
commit 5784aa0f5d

View File

@ -27,7 +27,7 @@ import time
from frappy.core import Drivable, HasIO, Writable, \
Parameter, Property, Readable, StringIO, Attached, IDLE, nopoll
from frappy.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType
from frappy.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType, TupleOf
from frappy.errors import HardwareError
from frappy_psi.convergence import HasConvergence
from frappy.lib.enum import Enum
@ -38,6 +38,7 @@ SELF = 0
def as_float(value):
"""converts string (with unit) to float and float to string"""
if isinstance(value, str):
return float(VALUE_UNIT.match(value).group(1))
return f'{value:g}'
@ -61,7 +62,7 @@ fast_slow = Mapped(ON=0, OFF=1) # maps OIs slow=ON/fast=OFF to sample_rate.slow
class IO(StringIO):
identification = [('*IDN?', r'IDN:OXFORD INSTRUMENTS:MERCURY*')]
identification = [('*IDN?', r'IDN:OXFORD INSTRUMENTS:*')]
class MercuryChannel(HasIO):
@ -70,7 +71,6 @@ class MercuryChannel(HasIO):
example: DB6.T1,DB1.H1
slot ids for sensor (and control output)''',
StringType())
channel_name = Parameter('mercury nick name', StringType(), default='', update_unchanged='never')
channel_type = '' #: channel type(s) for sensor (and control) e.g. TEMP,HTR or PRES,AUX
def _complete_adr(self, adr):
@ -83,7 +83,7 @@ class MercuryChannel(HasIO):
return f'DEV:{slot}:{head}{sep}{tail}'
return adr
def multiquery(self, adr, names=(), convert=as_float):
def multiquery(self, adr, names=(), convert=as_float, debug=None):
"""get parameter(s) in mercury syntax
:param adr: the 'address part' of the SCPI command
@ -100,25 +100,36 @@ class MercuryChannel(HasIO):
self.slot='DB5.P1,DB3.G1' # -> take second slot
-> query command will be READ:DEV:DB3.G1:PRES:SIG:PERC
"""
# TODO: if the need arises: allow convert to be a list
adr = self._complete_adr(adr)
cmd = f"READ:{adr}:{':'.join(names)}"
reply = self.communicate(cmd)
head = f'STAT:{adr}:'
try:
assert reply.startswith(head)
replyiter = iter(reply[len(head):].split(':'))
keys, result = zip(*zip(replyiter, replyiter))
assert keys == tuple(names)
return tuple(convert(r) for r in result)
except (AssertionError, AttributeError, ValueError):
raise HardwareError(f'invalid reply {reply!r} to cmd {cmd!r}') from None
msg = ''
for _ in range(3):
if msg:
self.log.warning('%s', msg)
reply = self.communicate(cmd)
if debug is not None:
debug.append(reply)
head = f'STAT:{adr}:'
try:
assert reply.startswith(head)
replyiter = iter(reply[len(head):].split(':'))
keys, result = zip(*zip(replyiter, replyiter))
assert keys == tuple(names)
return tuple(convert(r) for r in result)
except (AssertionError, AttributeError, ValueError):
time.sleep(0.1) # in case this was the answer of a previous command
msg = f'invalid reply {reply!r} to cmd {cmd!r}'
raise HardwareError(msg) from None
def multichange(self, adr, values, convert=as_float):
def multichange(self, adr, values, convert=as_float, tolerance=0, n_retry=3):
"""set parameter(s) in mercury syntax
:param adr: as in see multiquery method
:param values: [(name1, value1), (name2, value2) ...]
:param convert: a converter function (converts given value to string and replied string to value)
:param tolerance: tolerance for readback check
:param n_retry: number of retries or 0 for no readback check
:return: the values as tuple
Example:
@ -128,21 +139,41 @@ class MercuryChannel(HasIO):
self.slot='DB6.T1,DB1.H1' # and take first slot
-> change command will be SET:DEV:DB6.T1:TEMP:LOOP:P:5:I:2:D:0
"""
# TODO: if the need arises: allow convert and or tolerance to be a list
adr = self._complete_adr(adr)
params = [f'{k}:{convert(v)}' for k, v in values]
cmd = f"SET:{adr}:{':'.join(params)}"
reply = self.communicate(cmd)
head = f'STAT:SET:{adr}:'
try:
assert reply.startswith(head)
replyiter = iter(reply[len(head):].split(':'))
keys, result, valid = zip(*zip(replyiter, replyiter, replyiter))
assert keys == tuple(k for k, _ in values)
assert any(v == 'VALID' for v in valid)
return tuple(convert(r) for r in result)
except (AssertionError, AttributeError, ValueError) as e:
raise HardwareError(f'invalid reply {reply!r} to cmd {cmd!r}') from e
for _ in range(max(1, n_retry)): # try n_retry times or until readback result matches
reply = self.communicate(cmd)
head = f'STAT:SET:{adr}:'
try:
assert reply.startswith(head)
replyiter = iter(reply[len(head):].split(':'))
# reshuffle reply=(k1, r1, v1, k2, r2, v1) --> keys = (k1, k2), result = (r1, r2), valid = (v1, v2)
keys, result, valid = zip(*zip(replyiter, replyiter, replyiter))
assert keys == tuple(k for k, _ in values)
assert any(v == 'VALID' for v in valid)
result = tuple(convert(r) for r in result)
except (AssertionError, AttributeError, ValueError) as e:
time.sleep(0.1) # in case of missed replies this might help to skip garbage
raise HardwareError(f'invalid reply {reply!r} to cmd {cmd!r}') from e
if n_retry == 0:
return [v[1] for v in values] # no readback check
keys = [v[0] for v in values]
debug = []
readback = self.multiquery(adr, keys, convert, debug)
for k, r, b in zip(keys, result, readback):
if convert is as_float:
tol = max(abs(r) * 1e-3, abs(b) * 1e-3, tolerance)
if abs(r - b) > tol:
break
elif r != b:
break
else:
return readback
self.log.warning('sent: %s', cmd)
self.log.warning('got: %s', debug[0])
return readback
def query(self, adr, convert=as_float):
"""query a single parameter
@ -152,14 +183,9 @@ class MercuryChannel(HasIO):
adr, _, name = adr.rpartition(':')
return self.multiquery(adr, [name], convert)[0]
def change(self, adr, value, convert=as_float):
def change(self, adr, value, convert=as_float, tolerance=0, n_retry=3):
adr, _, name = adr.rpartition(':')
return self.multichange(adr, [(name, value)], convert)[0]
def read_channel_name(self):
if self.channel_name:
return self.channel_name # channel name will not change
return self.query('0:NICK', as_string)
return self.multichange(adr, [(name, value)], convert, tolerance, n_retry)[0]
class TemperatureSensor(MercuryChannel, Readable):
@ -176,26 +202,29 @@ class TemperatureSensor(MercuryChannel, Readable):
class HasInput(MercuryChannel):
controlled_by = Parameter('source of target value', EnumType(members={'self': SELF}), default=0)
target = Parameter(readonly=False)
input_modules = ()
# do not know why this? target = Parameter(readonly=False)
input_callbacks = ()
def add_input(self, modobj):
if not self.input_modules:
self.input_modules = []
self.input_modules.append(modobj)
def register_input(self, name, control_off):
"""register input
:param name: the name of the module (for controlled_by enum)
:param control_off: a method on the input module to switch off control
"""
if not self.input_callbacks:
self.input_callbacks = []
self.input_callbacks.append(control_off)
prev_enum = self.parameters['controlled_by'].datatype._enum
# add enum member, using autoincrement feature of Enum
self.parameters['controlled_by'].datatype = EnumType(Enum(prev_enum, **{modobj.name: None}))
self.parameters['controlled_by'].datatype = EnumType(Enum(prev_enum, **{name: None}))
def write_controlled_by(self, value):
if self.controlled_by == value:
return value
self.controlled_by = value
if value == SELF:
self.log.warning('switch to manual mode')
for input_module in self.input_modules:
if input_module.control_active:
input_module.write_control_active(False)
for control_off in self.input_callbacks:
control_off()
return value
@ -213,12 +242,17 @@ class Loop(HasConvergence, MercuryChannel, Drivable):
def initModule(self):
super().initModule()
if self.output_module:
self.output_module.add_input(self)
self.output_module.register_input(self.name, self.control_off)
def control_off(self):
if self.control_active:
self.log.warning('switch to manual mode')
self.write_control_active(False)
def set_output(self, active):
if active:
if self.output_module and self.output_module.controlled_by != self.name:
self.output_module.controlled_by = self.name
self.output_module.write_controlled_by(self.name)
else:
if self.output_module and self.output_module.controlled_by != SELF:
self.output_module.write_controlled_by(SELF)
@ -231,7 +265,7 @@ class Loop(HasConvergence, MercuryChannel, Drivable):
self.set_output(True)
else:
self.log.warning('switch loop control on')
self.write_control_active(True)
self.write_control_active(True)
self.target = target
self.start_state()
@ -312,24 +346,23 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
return volt * current
def read_target(self):
if self.controlled_by != 0 and self.target:
return 0
if self._last_target is not None:
if self.controlled_by != 0 or self._last_target is not None:
# read back only when not yet initialized
return self.target
self._volt_target = self.query('HTR:SIG:VOLT')
self.resistivity = max(10, self.query('HTR:RES'))
self._last_target = self._volt_target ** 2 / max(10, self.resistivity)
return self._last_target
def set_target(self, value):
def set_target(self, target):
"""set the target without switching to manual
might be used by a software loop
"""
self._volt_target = math.sqrt(value * self.resistivity)
self._volt_target = math.sqrt(target * self.resistivity)
self.change('HTR:SIG:VOLT', self._volt_target)
self._last_target = value
return value
self._last_target = target
return target
def write_target(self, value):
self.write_controlled_by(SELF)
@ -337,26 +370,29 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
class TemperatureLoop(TemperatureSensor, Loop, Drivable):
channel_type = 'TEMP,HTR'
output_module = Attached(HeaterOutput, mandatory=False)
ramp = Parameter('ramp rate', FloatRange(0, unit='K/min'), readonly=False)
channel_type = 'TEMP'
output_module = Attached(HasInput, mandatory=False)
ramp = Parameter('ramp rate', FloatRange(0, unit='$/min'), readonly=False)
enable_ramp = Parameter('enable ramp rate', BoolType(), readonly=False)
setpoint = Parameter('working setpoint (differs from target when ramping)', FloatRange(0, unit='$'))
auto_flow = Parameter('enable auto flow', BoolType(), readonly=False)
tolerance = Parameter(default=0.1)
_last_setpoint_change = None
ENABLE = 'TEMP:LOOP:ENAB'
ENABLE_RAMP = 'TEMP:LOOP:RENA'
RAMP_RATE = 'TEMP:LOOP:RSET'
def doPoll(self):
super().doPoll()
self.read_setpoint()
def read_control_active(self):
active = self.query('TEMP:LOOP:ENAB', off_on)
active = self.query(self.ENABLE, off_on)
self.set_output(active)
return active
def write_control_active(self, value):
self.set_output(value)
return self.change('TEMP:LOOP:ENAB', value, off_on)
return self.change(self.ENABLE, value, off_on)
@nopoll # polled by read_setpoint
def read_target(self):
@ -390,19 +426,24 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
return self.target
def read_enable_ramp(self):
return self.query('TEMP:LOOP:RENA', off_on)
return self.query(self.ENABLE_RAMP, off_on)
def write_enable_ramp(self, value):
return self.change('TEMP:LOOP:RENA', value, off_on)
return self.change(self.ENABLE_RAMP, value, off_on)
def read_auto_flow(self):
return self.query('TEMP:LOOP:FAUT', off_on)
def write_auto_flow(self, value):
return self.change('TEMP:LOOP:FAUT', value, off_on)
def set_output(self, active):
if active:
if self.output_module and self.output_module.controlled_by != self.name:
self.output_module.write_controlled_by(self.name)
else:
if self.output_module and self.output_module.controlled_by != SELF:
self.output_module.write_controlled_by(SELF)
status = IDLE, 'control inactive'
if self.status != status:
self.status = status
def read_ramp(self):
result = self.query('TEMP:LOOP:RSET')
result = self.query(self.RAMP_RATE)
return min(9e99, result)
def write_ramp(self, value):
@ -411,11 +452,11 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
self.write_enable_ramp(0)
return 0
if value >= 9e99:
self.change('TEMP:LOOP:RSET', 'inf', as_string)
self.change(self.RAMP_RATE, 'inf', as_string)
self.write_enable_ramp(0)
return 9e99
self.write_enable_ramp(1)
return self.change('TEMP:LOOP:RSET', max(1e-4, value))
return self.change(self.RAMP_RATE, max(1e-4, value))
class PressureSensor(MercuryChannel, Readable):
@ -451,9 +492,10 @@ class ValvePos(HasInput, MercuryChannel, Drivable):
return self.change('PRES:LOOP:FSET', value)
class PressureLoop(PressureSensor, Loop, Drivable):
channel_type = 'PRES,AUX'
class PressureLoop(HasInput, PressureSensor, Loop, Drivable):
channel_type = 'PRES'
output_module = Attached(ValvePos, mandatory=False)
tolerance = Parameter(default=0.1)
def read_control_active(self):
active = self.query('PRES:LOOP:FAUT', off_on)
@ -467,10 +509,60 @@ class PressureLoop(PressureSensor, Loop, Drivable):
def read_target(self):
return self.query('PRES:LOOP:PRST')
def set_target(self, target):
"""set the target without switching to manual
might be used by a software loop
"""
self.change('PRES:LOOP:PRST', target)
super().set_target(target)
def write_target(self, value):
target = self.change('PRES:LOOP:PRST', value)
self.set_target(target)
return self.target
self.write_controlled_by(SELF)
self.set_target(value)
return value
class HasAutoFlow:
needle_valve = Attached(PressureLoop, mandatory=False)
auto_flow = Parameter('enable auto flow', BoolType(), readonly=False, default=0)
flowpars = Parameter('Tdif(min, max), FlowSet(min, max)',
TupleOf(TupleOf(FloatRange(unit='K'), FloatRange(unit='K')),
TupleOf(FloatRange(unit='mbar'), FloatRange(unit='mbar'))),
readonly=False, default=((1, 5), (4, 20)))
def read_value(self):
value = super().read_value()
if self.auto_flow:
(dmin, dmax), (fmin, fmax) = self.flowpars
flowset = min(dmax - dmin, max(0, value - self.target - dmin)) / (dmax - dmin) * (fmax - fmin) + fmin
self.needle_valve.set_target(flowset)
return value
def initModule(self):
super().initModule()
if self.needle_valve:
self.needle_valve.register_input(self.name, self.auto_flow_off)
def write_auto_flow(self, value):
if value:
if self.needle_valve and self.needle_valve.controlled_by != self.name:
self.needle_valve.write_controlled_by(self.name)
else:
if self.needle_valve and self.needle_valve.controlled_by != SELF:
self.needle_valve.write_controlled_by(SELF)
_, (fmin, _) = self.flowpars
self.needle_valve.write_target(fmin)
return value
def auto_flow_off(self):
if self.auto_flow:
self.log.warning('switch auto flow off')
self.write_auto_flow(False)
class TemperatureAutoFlow(HasAutoFlow, TemperatureLoop):
pass
class HeLevel(MercuryChannel, Readable):
@ -480,6 +572,7 @@ class HeLevel(MercuryChannel, Readable):
(when filling) and slow (when consuming). We have to handle this by software.
"""
channel_type = 'LVL'
value = Parameter(unit='%')
sample_rate = Parameter('_', EnumType(slow=0, fast=1), readonly=False)
hysteresis = Parameter('hysteresis for detection of increase', FloatRange(0, 100, unit='%'),
default=5, readonly=False)
@ -533,9 +626,7 @@ class HeLevel(MercuryChannel, Readable):
class N2Level(MercuryChannel, Readable):
channel_type = 'LVL'
value = Parameter(unit='%')
def read_value(self):
return self.query('LVL:SIG:NIT:LEV')
# TODO: magnet power supply