mercury, ips, sea, triton, convergence

after gerrit

Change-Id: Iff14047ecc476589aef10c96fae9970133b8bd14
This commit is contained in:
2023-05-09 14:57:34 +02:00
parent 750b5a7794
commit 8039351395
12 changed files with 426 additions and 357 deletions

View File

@ -25,12 +25,13 @@ import math
import re
import time
from frappy.core import Drivable, HasIO, Writable, \
Parameter, Property, Readable, StringIO, Attached, IDLE, nopoll
from frappy.core import Drivable, HasIO, Writable, StatusType, \
Parameter, Property, Readable, StringIO, Attached, IDLE, RAMPING, nopoll
from frappy.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType, TupleOf
from frappy.errors import HardwareError, ProgrammingError, ConfigError
from frappy.errors import HardwareError, ProgrammingError, ConfigError, RangeError
from frappy_psi.convergence import HasConvergence
from frappy.lib.enum import Enum
from frappy.states import Retry, Finish
from frappy.mixins import HasOutputModule, HasControlledBy
VALUE_UNIT = re.compile(r'(.*\d|inf)([A-Za-z/%]*)$')
@ -126,7 +127,7 @@ class MercuryChannel(HasIO):
msg = f'invalid reply {reply!r} to cmd {cmd!r}'
raise HardwareError(msg) from None
def multichange(self, adr, values, convert=as_float, tolerance=0, n_retry=3):
def multichange(self, adr, values, convert=as_float, tolerance=0, n_retry=3, lazy=False):
"""set parameter(s) in mercury syntax
:param adr: as in multiquery method. SET: is added automatically
@ -134,6 +135,7 @@ class MercuryChannel(HasIO):
:param convert: a converter function (converts given value to string and replied string to value)
:param tolerance: tolerance for readback check
:param n_retry: number of retries or 0 for no readback check
:param lazy: check direct reply only (no additional query)
:return: the values as tuple
Example (kind=TEMP, slot=DB6.T1:
@ -145,6 +147,7 @@ class MercuryChannel(HasIO):
adr = self._complete_adr(adr)
params = [f'{k}:{convert(v)}' for k, v in values]
cmd = f"SET:{adr}:{':'.join(params)}"
givenkeys = tuple(v[0] for v in values)
for _ in range(max(1, n_retry)): # try n_retry times or until readback result matches
reply = self.communicate(cmd)
head = f'STAT:SET:{adr}:'
@ -153,29 +156,35 @@ class MercuryChannel(HasIO):
replyiter = iter(reply[len(head):].split(':'))
# reshuffle reply=(k1, r1, v1, k2, r2, v1) --> keys = (k1, k2), result = (r1, r2), valid = (v1, v2)
keys, result, valid = zip(*zip(replyiter, replyiter, replyiter))
assert keys == tuple(k for k, _ in values)
assert keys == givenkeys
assert any(v == 'VALID' for v in valid)
result = tuple(convert(r) for r in result)
except (AssertionError, AttributeError, ValueError) as e:
time.sleep(0.1) # in case of missed replies this might help to skip garbage
raise HardwareError(f'invalid reply {reply!r} to cmd {cmd!r}') from e
if n_retry == 0:
return [v[1] for v in values] # no readback check
keys = [v[0] for v in values]
debug = []
readback = self.multiquery(adr, keys, convert, debug)
for k, r, b in zip(keys, result, readback):
return [v for _, v in values]
if lazy:
debug = [reply]
readback = [v for _, v in values]
else:
debug = []
readback = list(self.multiquery(adr, givenkeys, convert, debug))
failed = False
for i, ((k, v), r, b) in enumerate(zip(values, result, readback)):
if convert is as_float:
tol = max(abs(r) * 1e-3, abs(b) * 1e-3, tolerance)
if abs(r - b) > tol:
break
elif r != b:
break
else:
if abs(b - v) > tol or abs(r - v) > tol:
readback[i] = None
failed = True
elif b != v or r != v:
readback[i] = None
failed = True
if not failed:
return readback
self.log.warning('sent: %s', cmd)
self.log.warning('got: %s', debug[0])
return readback
self.log.warning('sent: %s', cmd)
self.log.warning('got: %s', debug[0])
return tuple(v[1] if b is None else b for b, v in zip(readback, values))
def query(self, adr, convert=as_float):
"""query a single parameter
@ -185,9 +194,9 @@ class MercuryChannel(HasIO):
adr, _, name = adr.rpartition(':')
return self.multiquery(adr, [name], convert)[0]
def change(self, adr, value, convert=as_float, tolerance=0, n_retry=3):
def change(self, adr, value, convert=as_float, tolerance=0, n_retry=3, lazy=False):
adr, _, name = adr.rpartition(':')
return self.multichange(adr, [(name, value)], convert, tolerance, n_retry)[0]
return self.multichange(adr, [(name, value)], convert, tolerance, n_retry, lazy)[0]
class TemperatureSensor(MercuryChannel, Readable):
@ -202,38 +211,14 @@ class TemperatureSensor(MercuryChannel, Readable):
return self.query('DEV::TEMP:SIG:RES')
class HasInput(MercuryChannel):
controlled_by = Parameter('source of target value', EnumType(members={'self': SELF}), default=0)
# do not know why this? target = Parameter(readonly=False)
input_callbacks = ()
def register_input(self, name, control_off):
"""register input
:param name: the name of the module (for controlled_by enum)
:param control_off: a method on the input module to switch off control
"""
if not self.input_callbacks:
self.input_callbacks = []
self.input_callbacks.append(control_off)
prev_enum = self.parameters['controlled_by'].datatype._enum
# add enum member, using autoincrement feature of Enum
self.parameters['controlled_by'].datatype = EnumType(Enum(prev_enum, **{name: None}))
def write_controlled_by(self, value):
if self.controlled_by == value:
return value
self.controlled_by = value
if value == SELF:
for control_off in self.input_callbacks:
control_off()
return value
class HasInput(HasControlledBy, MercuryChannel):
pass
class Loop(HasConvergence, MercuryChannel, Drivable):
class Loop(HasOutputModule, MercuryChannel, Drivable):
"""common base class for loops"""
control_active = Parameter('control mode', BoolType())
output_module = Attached(HasInput, mandatory=False)
control_active = Parameter(readonly=False)
ctrlpars = Parameter(
'pid (proportional band, integral time, differential time',
StructOf(p=FloatRange(0, unit='$'), i=FloatRange(0, unit='min'), d=FloatRange(0, unit='min')),
@ -241,35 +226,15 @@ class Loop(HasConvergence, MercuryChannel, Drivable):
)
enable_pid_table = Parameter('', BoolType(), readonly=False)
def initModule(self):
super().initModule()
if self.output_module:
self.output_module.register_input(self.name, self.control_off)
def control_off(self):
if self.control_active:
self.log.warning('switch to manual mode')
self.write_control_active(False)
def set_output(self, active):
def set_output(self, active, source='HW'):
if active:
if self.output_module and self.output_module.controlled_by != self.name:
self.output_module.write_controlled_by(self.name)
self.activate_control()
else:
if self.output_module and self.output_module.controlled_by != SELF:
self.output_module.write_controlled_by(SELF)
status = IDLE, 'control inactive'
if self.status != status:
self.status = status
self.deactivate_control(source)
def set_target(self, target):
if self.control_active:
self.set_output(True)
else:
self.log.warning('switch loop control on')
self.write_control_active(True)
self.set_output(True)
self.target = target
self.start_state()
def read_enable_pid_table(self):
return self.query(f'DEV::{self.kinds[0]}:LOOP:PIDT', off_on)
@ -286,8 +251,24 @@ class Loop(HasConvergence, MercuryChannel, Drivable):
pid = self.multichange(f'DEV::{self.kinds[0]}:LOOP', [(k, value[k.lower()]) for k in 'PID'])
return {k.lower(): v for k, v in zip('PID', pid)}
def read_status(self):
return IDLE, ''
class HeaterOutput(HasInput, MercuryChannel, Writable):
class ConvLoop(HasConvergence, Loop):
def deactivate_control(self, source):
if self.control_active:
super().deactivate_control(source)
self.convergence_state.start(self.inactive_state)
if self.pollInfo:
self.pollInfo.trigger(True)
def inactive_state(self, state):
self.convergence_state.status = IDLE, 'control inactive'
return Finish
class HeaterOutput(HasInput, Writable):
"""heater output
Remark:
@ -324,7 +305,7 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
if self._last_target is not None:
if not self.true_power:
self._volt_target = math.sqrt(self._last_target * self.resistivity)
self.change('DEV::HTR:SIG:VOLT', self._volt_target)
self.change('DEV::HTR:SIG:VOLT', self._volt_target, tolerance=2e-4)
return self.resistivity
def read_status(self):
@ -344,7 +325,7 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
self.write_resistivity(round(res, 1))
if self.controlled_by == 0:
self._volt_target = math.sqrt(self._last_target * self.resistivity)
self.change('DEV::HTR:SIG:VOLT', self._volt_target)
self.change('DEV::HTR:SIG:VOLT', self._volt_target, tolerance=2e-4)
return volt * current
def read_target(self):
@ -362,23 +343,25 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
might be used by a software loop
"""
self._volt_target = math.sqrt(target * self.resistivity)
self.change('DEV::HTR:SIG:VOLT', self._volt_target)
self.change('DEV::HTR:SIG:VOLT', self._volt_target, tolerance=2e-4)
self._last_target = target
return target
def write_target(self, value):
self.write_controlled_by(SELF)
self.self_controlled()
return self.set_target(value)
class TemperatureLoop(TemperatureSensor, Loop, Drivable):
class TemperatureLoop(TemperatureSensor, ConvLoop):
kind = 'TEMP'
output_module = Attached(HasInput, mandatory=False)
ramp = Parameter('ramp rate', FloatRange(0, unit='$/min'), readonly=False)
enable_ramp = Parameter('enable ramp rate', BoolType(), readonly=False)
setpoint = Parameter('working setpoint (differs from target when ramping)', FloatRange(0, unit='$'))
status = Parameter(datatype=StatusType(Drivable, 'RAMPING')) # add ramping status
tolerance = Parameter(default=0.1)
_last_setpoint_change = None
__status = IDLE, ''
# overridden in subclass frappy_psi.triton.TemperatureLoop
ENABLE = 'TEMP:LOOP:ENAB'
ENABLE_RAMP = 'TEMP:LOOP:RENA'
@ -394,7 +377,9 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
return active
def write_control_active(self, value):
self.set_output(value)
if value:
raise RangeError('write to target to switch control on')
self.set_output(value, 'user')
return self.change(f'DEV::{self.ENABLE}', value, off_on)
@nopoll # polled by read_setpoint
@ -407,43 +392,65 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
def read_setpoint(self):
setpoint = self.query('DEV::TEMP:LOOP:TSET')
if self.enable_ramp:
if setpoint == self.setpoint:
if setpoint == self.target:
self.__ramping = False
elif setpoint == self.setpoint:
# update target when working setpoint does no longer change
if setpoint != self.target and self._last_setpoint_change is not None:
if self._last_setpoint_change is not None:
unchanged_since = time.time() - self._last_setpoint_change
if unchanged_since > max(12.0, 0.06 / max(1e-4, self.ramp)):
self.__ramping = False
self.target = self.setpoint
return setpoint
self._last_setpoint_change = time.time()
else:
self.__ramping = False
self.target = setpoint
return setpoint
def set_target(self, target):
self.change(f'DEV::{self.ENABLE}', True, off_on)
super().set_target(target)
def deactivate_control(self, source):
if self.__ramping:
self.__ramping = False
# stop ramping setpoint
self.change('DEV::TEMP:LOOP:TSET', self.read_setpoint(), lazy=True)
super().deactivate_control(source)
def ramping_state(self, state):
self.read_setpoint()
if self.__ramping:
return Retry
return self.convergence_approach
def write_target(self, value):
target = self.change('DEV::TEMP:LOOP:TSET', value)
target = self.change('DEV::TEMP:LOOP:TSET', value, lazy=True)
if self.enable_ramp:
self._last_setpoint_change = None
self.__ramping = True
self.set_target(value)
self.convergence_state.status = RAMPING, 'ramping'
self.read_status()
self.convergence_state.start(self.ramping_state)
else:
self.set_target(target)
self.convergence_start()
self.read_status()
return self.target
def read_enable_ramp(self):
return self.query(f'DEV::{self.ENABLE_RAMP}', off_on)
def write_enable_ramp(self, value):
return self.change(f'DEV::{self.ENABLE_RAMP}', value, off_on)
def set_output(self, active):
if active:
if self.output_module and self.output_module.controlled_by != self.name:
self.output_module.write_controlled_by(self.name)
else:
if self.output_module and self.output_module.controlled_by != SELF:
self.output_module.write_controlled_by(SELF)
status = IDLE, 'control inactive'
if self.status != status:
self.status = status
if self.enable_ramp < value: # ramp_enable was off: start from current value
self.change('DEV::TEMP:LOOP:TSET', self.value, lazy=True)
result = self.change(f'DEV::{self.ENABLE_RAMP}', value, off_on)
if self.isDriving() and value != self.enable_ramp:
self.enable_ramp = value
self.write_target(self.target)
return result
def read_ramp(self):
result = self.query(f'DEV::{self.RAMP_RATE}')
@ -470,7 +477,7 @@ class PressureSensor(MercuryChannel, Readable):
return self.query('DEV::PRES:SIG:PRES')
class ValvePos(HasInput, MercuryChannel, Drivable):
class ValvePos(HasInput, Drivable):
kind = 'PRES,AUX'
value = Parameter('value pos', FloatRange(unit='%'), readonly=False)
target = Parameter('valve pos target', FloatRange(0, 100, unit='$'), readonly=False)
@ -491,11 +498,11 @@ class ValvePos(HasInput, MercuryChannel, Drivable):
return self.query('DEV::PRES:LOOP:FSET')
def write_target(self, value):
self.write_controlled_by(SELF)
self.self_controlled()
return self.change('DEV::PRES:LOOP:FSET', value)
class PressureLoop(HasInput, PressureSensor, Loop, Drivable):
class PressureLoop(PressureSensor, HasControlledBy, ConvLoop):
kind = 'PRES'
output_module = Attached(ValvePos, mandatory=False)
tolerance = Parameter(default=0.1)
@ -506,7 +513,7 @@ class PressureLoop(HasInput, PressureSensor, Loop, Drivable):
return active
def write_control_active(self, value):
self.set_output(value)
self.set_output(value, 'user')
return self.change('DEV::PRES:LOOP:FAUT', value, off_on)
def read_target(self):
@ -521,7 +528,7 @@ class PressureLoop(HasInput, PressureSensor, Loop, Drivable):
super().set_target(target)
def write_target(self, value):
self.write_controlled_by(SELF)
self.self_controlled()
self.set_target(value)
return value
@ -548,14 +555,13 @@ class HasAutoFlow:
self.needle_valve.register_input(self.name, self.auto_flow_off)
def write_auto_flow(self, value):
if value:
if self.needle_valve and self.needle_valve.controlled_by != self.name:
self.needle_valve.write_controlled_by(self.name)
else:
if self.needle_valve and self.needle_valve.controlled_by != SELF:
self.needle_valve.write_controlled_by(SELF)
_, (fmin, _) = self.flowpars
self.needle_valve.write_target(fmin)
if self.needle_valve:
if value:
self.needle_valve.controlled_by = self.name
else:
if self.needle_valve.controlled_by != SELF:
self.needle_valve.controlled_by = SELF
self.needle_valve.write_target(self.flowpars[1][0]) # flow min
return value
def auto_flow_off(self):