fix interplay mercury.TemperatureLoop and HasConvergence

frappy_psi.convergence.HasConvergence:
- no need to inherit HasStates (should be independent)
- trigger current state then tolerance or settling_time is changed

frappy_psi.mercury:
- improve readback checks
- fix interplay with HasControlledBy
- fix interplay with HasConvergence

Change-Id: I6efedbe6bbfba5a66ddd22ac441ebf38af11eda6
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/31047
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
2023-05-08 14:16:26 +02:00
parent bc2b860f31
commit 3a3ca0372b
3 changed files with 196 additions and 155 deletions

View File

@@ -396,3 +396,13 @@ class UniqueObject:
def __repr__(self): def __repr__(self):
return self.name return self.name
def merge_status(*args):
"""merge status
the status with biggest code wins
texts matching maximal code are joined with ', '
"""
maxcode = max(a[0] for a in args)
return maxcode, ', '.join([a[1] for a in args if a[0] == maxcode and a[1]])

View File

@@ -21,11 +21,11 @@
# ***************************************************************************** # *****************************************************************************
from frappy.core import Parameter, FloatRange, BUSY, IDLE, WARN from frappy.core import Parameter, FloatRange, BUSY, IDLE, WARN
from frappy.states import HasStates
from frappy.lib.statemachine import StateMachine, Retry, Stop from frappy.lib.statemachine import StateMachine, Retry, Stop
from frappy.lib import merge_status
class HasConvergence(HasStates): class HasConvergence:
"""mixin for convergence checks """mixin for convergence checks
Implementation based on tolerance, settling time and timeout. Implementation based on tolerance, settling time and timeout.
@@ -33,6 +33,8 @@ class HasConvergence(HasStates):
fly. However, the full history is not considered, which means for example fly. However, the full history is not considered, which means for example
that the spent time inside tolerance stored already is not altered when that the spent time inside tolerance stored already is not altered when
changing tolerance. changing tolerance.
does not inherit from HasStates (own state machine!)
""" """
tolerance = Parameter('absolute tolerance', FloatRange(0, unit='$'), readonly=False, default=0) tolerance = Parameter('absolute tolerance', FloatRange(0, unit='$'), readonly=False, default=0)
settling_time = Parameter( settling_time = Parameter(
@@ -51,118 +53,131 @@ class HasConvergence(HasStates):
As soon as the value is the first time within tolerance, the timeout criterium is changed: As soon as the value is the first time within tolerance, the timeout criterium is changed:
then the timeout event happens after this time + <settling_time> + <timeout>. then the timeout event happens after this time + <settling_time> + <timeout>.
''', FloatRange(0, unit='sec'), readonly=False, default=3600) ''', FloatRange(0, unit='sec'), readonly=False, default=3600)
status = Parameter('status determined from convergence checks', default=(IDLE, '')) status = Parameter() # make sure status is a parameter
convergence_state = None convergence_state = None # the state machine
def earlyInit(self): def earlyInit(self):
super().earlyInit() super().earlyInit()
self.convergence_state = StateMachine(threaded=False, logger=self.log, self.convergence_state = StateMachine(
cleanup=self.cleanup, spent_inside=0) threaded=False, logger=self.log, cleanup=self.convergence_cleanup,
status=(IDLE, ''), spent_inside=0, stop_status=(IDLE, 'stopped'))
def cleanup(self, state): def convergence_cleanup(self, state):
state.default_cleanup(state) state.default_cleanup(state)
if state.stopped: if state.stopped:
if state.stopped is Stop: # and not Restart if state.stopped is Stop: # and not Restart
self.status = WARN, 'stopped' self.__set_status(WARN, 'stopped')
else: else:
self.status = WARN, repr(state.last_error) self.__set_status(WARN, repr(state.last_error))
def doPoll(self): def doPoll(self):
super().doPoll() super().doPoll()
state = self.convergence_state state = self.convergence_state
state.cycle() state.cycle()
def get_min_slope(self, dif): def __set_status(self, *status):
if status != self.convergence_state.status:
self.convergence_state.status = status
self.read_status()
def read_status(self):
return merge_status(super().read_status(), self.convergence_state.status)
#self.log.warn('inner %r conv %r merged %r', super().read_status(), self.convergence_state.status, merged)
#return merged
def convergence_min_slope(self, dif):
"""calculate minimum expected slope"""
slope = getattr(self, 'workingramp', 0) or getattr(self, 'ramp', 0) slope = getattr(self, 'workingramp', 0) or getattr(self, 'ramp', 0)
if slope or not self.timeout: if slope or not self.timeout:
return slope return slope
return dif / self.timeout # assume exponential decay of dif, with time constant <tolerance> return dif / self.timeout # assume exponential decay of dif, with time constant <tolerance>
def get_dif_tol(self): def convergence_dif(self):
value = self.read_value() """get difference target - value and tolerance"""
tol = self.tolerance tol = self.tolerance
if not tol: if not tol:
tol = 0.01 * max(abs(self.target), abs(value)) tol = 0.01 * max(abs(self.target), abs(self.value))
dif = abs(self.target - value) dif = abs(self.target - self.value)
return dif, tol return dif, tol
def start_state(self): def convergence_start(self):
"""to be called from write_target""" """to be called from write_target"""
self.convergence_state.start(self.state_approach) self.__set_status(BUSY, 'changed_target')
self.convergence_state.start(self.convergence_approach)
def state_approach(self, state): def convergence_approach(self, state):
"""approaching, checking progress (busy)""" """approaching, checking progress (busy)"""
state.spent_inside = 0 state.spent_inside = 0
dif, tol = self.get_dif_tol() dif, tol = self.convergence_dif()
if dif < tol: if dif < tol:
state.timeout_base = state.now state.timeout_base = state.now
return self.state_inside return self.convergence_inside
if not self.timeout: if not self.timeout:
return Retry return Retry
if state.init: if state.init:
state.timeout_base = state.now state.timeout_base = state.now
state.dif_crit = dif # criterium for resetting timeout base state.dif_crit = dif # criterium for resetting timeout base
self.status = BUSY, 'approaching' self.__set_status(BUSY, 'approaching')
state.dif_crit -= self.get_min_slope(dif) * state.delta() state.dif_crit -= self.convergence_min_slope(dif) * state.delta()
if dif < state.dif_crit: # progress is good: reset timeout base if dif < state.dif_crit: # progress is good: reset timeout base
state.timeout_base = state.now state.timeout_base = state.now
elif state.now > state.timeout_base + self.timeout: elif state.now > state.timeout_base + self.timeout:
self.status = WARN, 'convergence timeout' self.__set_status(WARN, 'convergence timeout')
return self.state_instable return self.convergence_instable
return Retry return Retry
def state_inside(self, state): def convergence_inside(self, state):
"""inside tolerance, still busy""" """inside tolerance, still busy"""
dif, tol = self.get_dif_tol() dif, tol = self.convergence_dif()
if dif > tol: if dif > tol:
return self.state_outside return self.convergence_outside
state.spent_inside += state.delta() state.spent_inside += state.delta()
if state.spent_inside > self.settling_time: if state.spent_inside > self.settling_time:
self.status = IDLE, 'reached target' self.__set_status(IDLE, 'reached target')
return self.state_stable return self.convergence_stable
if state.init: if state.init:
self.status = BUSY, 'inside tolerance' self.__set_status(BUSY, 'inside tolerance')
return Retry return Retry
def state_outside(self, state): def convergence_outside(self, state):
"""temporarely outside tolerance, busy""" """temporarely outside tolerance, busy"""
dif, tol = self.get_dif_tol() dif, tol = self.convergence_dif()
if dif < tol: if dif < tol:
return self.state_inside return self.convergence_inside
if state.now > state.timeout_base + self.settling_time + self.timeout: if state.now > state.timeout_base + self.settling_time + self.timeout:
self.status = WARN, 'settling timeout' self.__set_status(WARN, 'settling timeout')
return self.state_instable return self.convergence_instable
if state.init: if state.init:
self.status = BUSY, 'outside tolerance' self.__set_status(BUSY, 'outside tolerance')
# do not reset the settling time on occasional outliers, count backwards instead # do not reset the settling time on occasional outliers, count backwards instead
state.spent_inside = max(0.0, state.spent_inside - state.delta()) state.spent_inside = max(0.0, state.spent_inside - state.delta())
return Retry return Retry
def state_stable(self, state): def convergence_stable(self, state):
"""stable, after settling_time spent within tolerance, idle""" """stable, after settling_time spent within tolerance, idle"""
dif, tol = self.get_dif_tol() dif, tol = self.convergence_dif()
if dif <= tol: if dif <= tol:
return Retry return Retry
self.status = WARN, 'instable' self.__set_status(WARN, 'instable')
state.spent_inside = max(self.settling_time, state.spent_inside) state.spent_inside = max(self.settling_time, state.spent_inside)
return self.state_instable return self.convergence_instable
def state_instable(self, state): def convergence_instable(self, state):
"""went outside tolerance from stable, warning""" """went outside tolerance from stable, warning"""
dif, tol = self.get_dif_tol() dif, tol = self.convergence_dif()
if dif <= tol: if dif <= tol:
state.spent_inside += state.delta() state.spent_inside += state.delta()
if state.spent_inside > self.settling_time: if state.spent_inside > self.settling_time:
self.status = IDLE, 'stable' # = recovered from instable self.__set_status(IDLE, 'stable') # = recovered from instable
return self.state_stable return self.convergence_stable
else: else:
state.spent_inside = max(0, state.spent_inside - state.delta()) state.spent_inside = max(0, state.spent_inside - state.delta())
return Retry return Retry
def state_interrupt(self, state): def convergence_interrupt(self, state):
"""stopping""" """stopping"""
self.status = IDLE, 'stopped' # stop called self.__set_status(state.stop_status) # stop called
return self.state_instable return self.convergence_instable
def stop(self): def stop(self):
"""set to idle when busy """set to idle when busy
@@ -170,4 +185,14 @@ class HasConvergence(HasStates):
does not stop control! does not stop control!
""" """
if self.isBusy(): if self.isBusy():
self.convergence_state.start(self.state_interrupt) self.convergence_state.start(self.convergence_interrupt)
def write_settling_time(self, value):
if self.pollInfo:
self.pollInfo.trigger(True)
return value
def write_tolerance(self, value):
if self.pollInfo:
self.pollInfo.trigger(True)
return value

View File

@@ -25,12 +25,13 @@ import math
import re import re
import time import time
from frappy.core import Drivable, HasIO, Writable, \ from frappy.core import Drivable, HasIO, Writable, StatusType, \
Parameter, Property, Readable, StringIO, Attached, IDLE, nopoll Parameter, Property, Readable, StringIO, Attached, IDLE, RAMPING, nopoll
from frappy.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType, TupleOf from frappy.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType, TupleOf
from frappy.errors import HardwareError, ProgrammingError, ConfigError from frappy.errors import HardwareError, ProgrammingError, ConfigError, RangeError
from frappy_psi.convergence import HasConvergence from frappy_psi.convergence import HasConvergence
from frappy.lib.enum import Enum from frappy.states import Retry, Finish
from frappy.mixins import HasOutputModule, HasControlledBy
VALUE_UNIT = re.compile(r'(.*\d|inf)([A-Za-z/%]*)$') VALUE_UNIT = re.compile(r'(.*\d|inf)([A-Za-z/%]*)$')
@@ -126,7 +127,7 @@ class MercuryChannel(HasIO):
msg = f'invalid reply {reply!r} to cmd {cmd!r}' msg = f'invalid reply {reply!r} to cmd {cmd!r}'
raise HardwareError(msg) from None raise HardwareError(msg) from None
def multichange(self, adr, values, convert=as_float, tolerance=0, n_retry=3): def multichange(self, adr, values, convert=as_float, tolerance=0, n_retry=3, lazy=False):
"""set parameter(s) in mercury syntax """set parameter(s) in mercury syntax
:param adr: as in multiquery method. SET: is added automatically :param adr: as in multiquery method. SET: is added automatically
@@ -134,6 +135,7 @@ class MercuryChannel(HasIO):
:param convert: a converter function (converts given value to string and replied string to value) :param convert: a converter function (converts given value to string and replied string to value)
:param tolerance: tolerance for readback check :param tolerance: tolerance for readback check
:param n_retry: number of retries or 0 for no readback check :param n_retry: number of retries or 0 for no readback check
:param lazy: check direct reply only (no additional query)
:return: the values as tuple :return: the values as tuple
Example (kind=TEMP, slot=DB6.T1: Example (kind=TEMP, slot=DB6.T1:
@@ -145,6 +147,7 @@ class MercuryChannel(HasIO):
adr = self._complete_adr(adr) adr = self._complete_adr(adr)
params = [f'{k}:{convert(v)}' for k, v in values] params = [f'{k}:{convert(v)}' for k, v in values]
cmd = f"SET:{adr}:{':'.join(params)}" cmd = f"SET:{adr}:{':'.join(params)}"
givenkeys = tuple(v[0] for v in values)
for _ in range(max(1, n_retry)): # try n_retry times or until readback result matches for _ in range(max(1, n_retry)): # try n_retry times or until readback result matches
reply = self.communicate(cmd) reply = self.communicate(cmd)
head = f'STAT:SET:{adr}:' head = f'STAT:SET:{adr}:'
@@ -153,29 +156,35 @@ class MercuryChannel(HasIO):
replyiter = iter(reply[len(head):].split(':')) replyiter = iter(reply[len(head):].split(':'))
# reshuffle reply=(k1, r1, v1, k2, r2, v1) --> keys = (k1, k2), result = (r1, r2), valid = (v1, v2) # reshuffle reply=(k1, r1, v1, k2, r2, v1) --> keys = (k1, k2), result = (r1, r2), valid = (v1, v2)
keys, result, valid = zip(*zip(replyiter, replyiter, replyiter)) keys, result, valid = zip(*zip(replyiter, replyiter, replyiter))
assert keys == tuple(k for k, _ in values) assert keys == givenkeys
assert any(v == 'VALID' for v in valid) assert any(v == 'VALID' for v in valid)
result = tuple(convert(r) for r in result) result = tuple(convert(r) for r in result)
except (AssertionError, AttributeError, ValueError) as e: except (AssertionError, AttributeError, ValueError) as e:
time.sleep(0.1) # in case of missed replies this might help to skip garbage time.sleep(0.1) # in case of missed replies this might help to skip garbage
raise HardwareError(f'invalid reply {reply!r} to cmd {cmd!r}') from e raise HardwareError(f'invalid reply {reply!r} to cmd {cmd!r}') from e
if n_retry == 0: if n_retry == 0:
return [v[1] for v in values] # no readback check return [v for _, v in values]
keys = [v[0] for v in values] if lazy:
debug = [] debug = [reply]
readback = self.multiquery(adr, keys, convert, debug) readback = [v for _, v in values]
for k, r, b in zip(keys, result, readback): else:
debug = []
readback = list(self.multiquery(adr, givenkeys, convert, debug))
failed = False
for i, ((k, v), r, b) in enumerate(zip(values, result, readback)):
if convert is as_float: if convert is as_float:
tol = max(abs(r) * 1e-3, abs(b) * 1e-3, tolerance) tol = max(abs(r) * 1e-3, abs(b) * 1e-3, tolerance)
if abs(r - b) > tol: if abs(b - v) > tol or abs(r - v) > tol:
break readback[i] = None
elif r != b: failed = True
break elif b != v or r != v:
else: readback[i] = None
failed = True
if not failed:
return readback return readback
self.log.warning('sent: %s', cmd) self.log.warning('sent: %s', cmd)
self.log.warning('got: %s', debug[0]) self.log.warning('got: %s', debug[0])
return readback return tuple(v[1] if b is None else b for b, v in zip(readback, values))
def query(self, adr, convert=as_float): def query(self, adr, convert=as_float):
"""query a single parameter """query a single parameter
@@ -185,9 +194,9 @@ class MercuryChannel(HasIO):
adr, _, name = adr.rpartition(':') adr, _, name = adr.rpartition(':')
return self.multiquery(adr, [name], convert)[0] return self.multiquery(adr, [name], convert)[0]
def change(self, adr, value, convert=as_float, tolerance=0, n_retry=3): def change(self, adr, value, convert=as_float, tolerance=0, n_retry=3, lazy=False):
adr, _, name = adr.rpartition(':') adr, _, name = adr.rpartition(':')
return self.multichange(adr, [(name, value)], convert, tolerance, n_retry)[0] return self.multichange(adr, [(name, value)], convert, tolerance, n_retry, lazy)[0]
class TemperatureSensor(MercuryChannel, Readable): class TemperatureSensor(MercuryChannel, Readable):
@@ -202,38 +211,14 @@ class TemperatureSensor(MercuryChannel, Readable):
return self.query('DEV::TEMP:SIG:RES') return self.query('DEV::TEMP:SIG:RES')
class HasInput(MercuryChannel): class HasInput(HasControlledBy, MercuryChannel):
controlled_by = Parameter('source of target value', EnumType(members={'self': SELF}), default=0) pass
# do not know why this? target = Parameter(readonly=False)
input_callbacks = ()
def register_input(self, name, control_off):
"""register input
:param name: the name of the module (for controlled_by enum)
:param control_off: a method on the input module to switch off control
"""
if not self.input_callbacks:
self.input_callbacks = []
self.input_callbacks.append(control_off)
prev_enum = self.parameters['controlled_by'].datatype._enum
# add enum member, using autoincrement feature of Enum
self.parameters['controlled_by'].datatype = EnumType(Enum(prev_enum, **{name: None}))
def write_controlled_by(self, value):
if self.controlled_by == value:
return value
self.controlled_by = value
if value == SELF:
for control_off in self.input_callbacks:
control_off()
return value
class Loop(HasConvergence, MercuryChannel, Drivable): class Loop(HasOutputModule, MercuryChannel, Drivable):
"""common base class for loops""" """common base class for loops"""
control_active = Parameter('control mode', BoolType())
output_module = Attached(HasInput, mandatory=False) output_module = Attached(HasInput, mandatory=False)
control_active = Parameter(readonly=False)
ctrlpars = Parameter( ctrlpars = Parameter(
'pid (proportional band, integral time, differential time', 'pid (proportional band, integral time, differential time',
StructOf(p=FloatRange(0, unit='$'), i=FloatRange(0, unit='min'), d=FloatRange(0, unit='min')), StructOf(p=FloatRange(0, unit='$'), i=FloatRange(0, unit='min'), d=FloatRange(0, unit='min')),
@@ -241,35 +226,15 @@ class Loop(HasConvergence, MercuryChannel, Drivable):
) )
enable_pid_table = Parameter('', BoolType(), readonly=False) enable_pid_table = Parameter('', BoolType(), readonly=False)
def initModule(self): def set_output(self, active, source='HW'):
super().initModule()
if self.output_module:
self.output_module.register_input(self.name, self.control_off)
def control_off(self):
if self.control_active:
self.log.warning('switch to manual mode')
self.write_control_active(False)
def set_output(self, active):
if active: if active:
if self.output_module and self.output_module.controlled_by != self.name: self.activate_control()
self.output_module.write_controlled_by(self.name)
else: else:
if self.output_module and self.output_module.controlled_by != SELF: self.deactivate_control(source)
self.output_module.write_controlled_by(SELF)
status = IDLE, 'control inactive'
if self.status != status:
self.status = status
def set_target(self, target): def set_target(self, target):
if self.control_active: self.set_output(True)
self.set_output(True)
else:
self.log.warning('switch loop control on')
self.write_control_active(True)
self.target = target self.target = target
self.start_state()
def read_enable_pid_table(self): def read_enable_pid_table(self):
return self.query(f'DEV::{self.kinds[0]}:LOOP:PIDT', off_on) return self.query(f'DEV::{self.kinds[0]}:LOOP:PIDT', off_on)
@@ -286,8 +251,24 @@ class Loop(HasConvergence, MercuryChannel, Drivable):
pid = self.multichange(f'DEV::{self.kinds[0]}:LOOP', [(k, value[k.lower()]) for k in 'PID']) pid = self.multichange(f'DEV::{self.kinds[0]}:LOOP', [(k, value[k.lower()]) for k in 'PID'])
return {k.lower(): v for k, v in zip('PID', pid)} return {k.lower(): v for k, v in zip('PID', pid)}
def read_status(self):
return IDLE, ''
class HeaterOutput(HasInput, MercuryChannel, Writable):
class ConvLoop(HasConvergence, Loop):
def deactivate_control(self, source):
if self.control_active:
super().deactivate_control(source)
self.convergence_state.start(self.inactive_state)
if self.pollInfo:
self.pollInfo.trigger(True)
def inactive_state(self, state):
self.convergence_state.status = IDLE, 'control inactive'
return Finish
class HeaterOutput(HasInput, Writable):
"""heater output """heater output
Remark: Remark:
@@ -324,7 +305,7 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
if self._last_target is not None: if self._last_target is not None:
if not self.true_power: if not self.true_power:
self._volt_target = math.sqrt(self._last_target * self.resistivity) self._volt_target = math.sqrt(self._last_target * self.resistivity)
self.change('DEV::HTR:SIG:VOLT', self._volt_target) self.change('DEV::HTR:SIG:VOLT', self._volt_target, tolerance=2e-4)
return self.resistivity return self.resistivity
def read_status(self): def read_status(self):
@@ -344,7 +325,7 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
self.write_resistivity(round(res, 1)) self.write_resistivity(round(res, 1))
if self.controlled_by == 0: if self.controlled_by == 0:
self._volt_target = math.sqrt(self._last_target * self.resistivity) self._volt_target = math.sqrt(self._last_target * self.resistivity)
self.change('DEV::HTR:SIG:VOLT', self._volt_target) self.change('DEV::HTR:SIG:VOLT', self._volt_target, tolerance=2e-4)
return volt * current return volt * current
def read_target(self): def read_target(self):
@@ -362,23 +343,25 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
might be used by a software loop might be used by a software loop
""" """
self._volt_target = math.sqrt(target * self.resistivity) self._volt_target = math.sqrt(target * self.resistivity)
self.change('DEV::HTR:SIG:VOLT', self._volt_target) self.change('DEV::HTR:SIG:VOLT', self._volt_target, tolerance=2e-4)
self._last_target = target self._last_target = target
return target return target
def write_target(self, value): def write_target(self, value):
self.write_controlled_by(SELF) self.self_controlled()
return self.set_target(value) return self.set_target(value)
class TemperatureLoop(TemperatureSensor, Loop, Drivable): class TemperatureLoop(TemperatureSensor, ConvLoop):
kind = 'TEMP' kind = 'TEMP'
output_module = Attached(HasInput, mandatory=False) output_module = Attached(HasInput, mandatory=False)
ramp = Parameter('ramp rate', FloatRange(0, unit='$/min'), readonly=False) ramp = Parameter('ramp rate', FloatRange(0, unit='$/min'), readonly=False)
enable_ramp = Parameter('enable ramp rate', BoolType(), readonly=False) enable_ramp = Parameter('enable ramp rate', BoolType(), readonly=False)
setpoint = Parameter('working setpoint (differs from target when ramping)', FloatRange(0, unit='$')) setpoint = Parameter('working setpoint (differs from target when ramping)', FloatRange(0, unit='$'))
status = Parameter(datatype=StatusType(Drivable, 'RAMPING')) # add ramping status
tolerance = Parameter(default=0.1) tolerance = Parameter(default=0.1)
_last_setpoint_change = None _last_setpoint_change = None
__status = IDLE, ''
# overridden in subclass frappy_psi.triton.TemperatureLoop # overridden in subclass frappy_psi.triton.TemperatureLoop
ENABLE = 'TEMP:LOOP:ENAB' ENABLE = 'TEMP:LOOP:ENAB'
ENABLE_RAMP = 'TEMP:LOOP:RENA' ENABLE_RAMP = 'TEMP:LOOP:RENA'
@@ -394,7 +377,9 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
return active return active
def write_control_active(self, value): def write_control_active(self, value):
self.set_output(value) if value:
raise RangeError('write to target to switch control on')
self.set_output(value, 'user')
return self.change(f'DEV::{self.ENABLE}', value, off_on) return self.change(f'DEV::{self.ENABLE}', value, off_on)
@nopoll # polled by read_setpoint @nopoll # polled by read_setpoint
@@ -407,43 +392,65 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
def read_setpoint(self): def read_setpoint(self):
setpoint = self.query('DEV::TEMP:LOOP:TSET') setpoint = self.query('DEV::TEMP:LOOP:TSET')
if self.enable_ramp: if self.enable_ramp:
if setpoint == self.setpoint: if setpoint == self.target:
self.__ramping = False
elif setpoint == self.setpoint:
# update target when working setpoint does no longer change # update target when working setpoint does no longer change
if setpoint != self.target and self._last_setpoint_change is not None: if self._last_setpoint_change is not None:
unchanged_since = time.time() - self._last_setpoint_change unchanged_since = time.time() - self._last_setpoint_change
if unchanged_since > max(12.0, 0.06 / max(1e-4, self.ramp)): if unchanged_since > max(12.0, 0.06 / max(1e-4, self.ramp)):
self.__ramping = False
self.target = self.setpoint self.target = self.setpoint
return setpoint return setpoint
self._last_setpoint_change = time.time() self._last_setpoint_change = time.time()
else: else:
self.__ramping = False
self.target = setpoint self.target = setpoint
return setpoint return setpoint
def set_target(self, target):
self.change(f'DEV::{self.ENABLE}', True, off_on)
super().set_target(target)
def deactivate_control(self, source):
if self.__ramping:
self.__ramping = False
# stop ramping setpoint
self.change('DEV::TEMP:LOOP:TSET', self.read_setpoint(), lazy=True)
super().deactivate_control(source)
def ramping_state(self, state):
self.read_setpoint()
if self.__ramping:
return Retry
return self.convergence_approach
def write_target(self, value): def write_target(self, value):
target = self.change('DEV::TEMP:LOOP:TSET', value) target = self.change('DEV::TEMP:LOOP:TSET', value, lazy=True)
if self.enable_ramp: if self.enable_ramp:
self._last_setpoint_change = None self._last_setpoint_change = None
self.__ramping = True
self.set_target(value) self.set_target(value)
self.convergence_state.status = RAMPING, 'ramping'
self.read_status()
self.convergence_state.start(self.ramping_state)
else: else:
self.set_target(target) self.set_target(target)
self.convergence_start()
self.read_status()
return self.target return self.target
def read_enable_ramp(self): def read_enable_ramp(self):
return self.query(f'DEV::{self.ENABLE_RAMP}', off_on) return self.query(f'DEV::{self.ENABLE_RAMP}', off_on)
def write_enable_ramp(self, value): def write_enable_ramp(self, value):
return self.change(f'DEV::{self.ENABLE_RAMP}', value, off_on) if self.enable_ramp < value: # ramp_enable was off: start from current value
self.change('DEV::TEMP:LOOP:TSET', self.value, lazy=True)
def set_output(self, active): result = self.change(f'DEV::{self.ENABLE_RAMP}', value, off_on)
if active: if self.isDriving() and value != self.enable_ramp:
if self.output_module and self.output_module.controlled_by != self.name: self.enable_ramp = value
self.output_module.write_controlled_by(self.name) self.write_target(self.target)
else: return result
if self.output_module and self.output_module.controlled_by != SELF:
self.output_module.write_controlled_by(SELF)
status = IDLE, 'control inactive'
if self.status != status:
self.status = status
def read_ramp(self): def read_ramp(self):
result = self.query(f'DEV::{self.RAMP_RATE}') result = self.query(f'DEV::{self.RAMP_RATE}')
@@ -470,7 +477,7 @@ class PressureSensor(MercuryChannel, Readable):
return self.query('DEV::PRES:SIG:PRES') return self.query('DEV::PRES:SIG:PRES')
class ValvePos(HasInput, MercuryChannel, Drivable): class ValvePos(HasInput, Drivable):
kind = 'PRES,AUX' kind = 'PRES,AUX'
value = Parameter('value pos', FloatRange(unit='%'), readonly=False) value = Parameter('value pos', FloatRange(unit='%'), readonly=False)
target = Parameter('valve pos target', FloatRange(0, 100, unit='$'), readonly=False) target = Parameter('valve pos target', FloatRange(0, 100, unit='$'), readonly=False)
@@ -491,11 +498,11 @@ class ValvePos(HasInput, MercuryChannel, Drivable):
return self.query('DEV::PRES:LOOP:FSET') return self.query('DEV::PRES:LOOP:FSET')
def write_target(self, value): def write_target(self, value):
self.write_controlled_by(SELF) self.self_controlled()
return self.change('DEV::PRES:LOOP:FSET', value) return self.change('DEV::PRES:LOOP:FSET', value)
class PressureLoop(HasInput, PressureSensor, Loop, Drivable): class PressureLoop(PressureSensor, HasControlledBy, ConvLoop):
kind = 'PRES' kind = 'PRES'
output_module = Attached(ValvePos, mandatory=False) output_module = Attached(ValvePos, mandatory=False)
tolerance = Parameter(default=0.1) tolerance = Parameter(default=0.1)
@@ -506,7 +513,7 @@ class PressureLoop(HasInput, PressureSensor, Loop, Drivable):
return active return active
def write_control_active(self, value): def write_control_active(self, value):
self.set_output(value) self.set_output(value, 'user')
return self.change('DEV::PRES:LOOP:FAUT', value, off_on) return self.change('DEV::PRES:LOOP:FAUT', value, off_on)
def read_target(self): def read_target(self):
@@ -521,7 +528,7 @@ class PressureLoop(HasInput, PressureSensor, Loop, Drivable):
super().set_target(target) super().set_target(target)
def write_target(self, value): def write_target(self, value):
self.write_controlled_by(SELF) self.self_controlled()
self.set_target(value) self.set_target(value)
return value return value
@@ -548,14 +555,13 @@ class HasAutoFlow:
self.needle_valve.register_input(self.name, self.auto_flow_off) self.needle_valve.register_input(self.name, self.auto_flow_off)
def write_auto_flow(self, value): def write_auto_flow(self, value):
if value: if self.needle_valve:
if self.needle_valve and self.needle_valve.controlled_by != self.name: if value:
self.needle_valve.write_controlled_by(self.name) self.needle_valve.controlled_by = self.name
else: else:
if self.needle_valve and self.needle_valve.controlled_by != SELF: if self.needle_valve.controlled_by != SELF:
self.needle_valve.write_controlled_by(SELF) self.needle_valve.controlled_by = SELF
_, (fmin, _) = self.flowpars self.needle_valve.write_target(self.flowpars[1][0]) # flow min
self.needle_valve.write_target(fmin)
return value return value
def auto_flow_off(self): def auto_flow_off(self):