mercury, as of 2022-02-01

Change-Id: Ifdbb2afc827b894874edaec50e82b645023beda9
This commit is contained in:
zolliker 2022-02-02 09:55:43 +01:00
parent 9109170752
commit 903e17a6e5

View File

@ -27,8 +27,9 @@ import time
from secop.core import Drivable, HasIodev, \ from secop.core import Drivable, HasIodev, \
Parameter, Property, Readable, StringIO Parameter, Property, Readable, StringIO
from secop.datatypes import EnumType, FloatRange, StringType from secop.datatypes import EnumType, FloatRange, StringType, StructOf
from secop.errors import HardwareError from secop.errors import HardwareError
from secop.lib.statemachine import StateMachine
class MercuryIO(StringIO): class MercuryIO(StringIO):
@ -120,7 +121,7 @@ class HasProgressCheck:
changing tolerance. changing tolerance.
""" """
tolerance = Parameter('absolute tolerance', FloatRange(0), readonly=False, default=0) tolerance = Parameter('absolute tolerance', FloatRange(0), readonly=False, default=0)
relative_tolerance = Parameter('_', FloatRange(0, 1), readonly=False, default=0) min_slope = Parameter('minimal abs(slope)', FloatRange(0), readonly=False, default=0)
settling_time = Parameter( settling_time = Parameter(
'''settling time '''settling time
@ -130,75 +131,76 @@ class HasProgressCheck:
'''timeout '''timeout
timeout = 0: disabled, else: timeout = 0: disabled, else:
A timeout happens, when the difference value - target is not improved by more than A timeout event happens, when the difference (target - value) is not improved by
a factor 2 within timeout. at least min_slope * timeout over any interval (t, t + timeout).
As soon as the value is the first time within tolerance, the criterium is changed:
More precisely, we expect a convergence curve which decreases the difference then the timeout event happens after this time + settling_time + timeout.
by a factor 2 within timeout/2.
If this expected progress is delayed by more than timeout/2, a timeout happens.
If the convergence is better than above, the expected curve is adjusted continuously.
In case the tolerance is reached once, a timeout happens when the time after this is
exceeded by more than settling_time + timeout.
''', FloatRange(0, unit='sec'), readonly=False, default=3600) ''', FloatRange(0, unit='sec'), readonly=False, default=3600)
status = Parameter('status determined from progress check') status = Parameter('status determined from progress check')
value = Parameter() value = Parameter()
target = Parameter() target = Parameter()
_settling_start = None # supposed start of settling time (0 when outside) def earlyInit(self):
_first_inside = None # first time within tolerance super().earlyInit()
_spent_inside = 0 # accumulated settling time self.__state = StateMachine()
# the upper limit for t0, for the curve timeout_dif * 2 ** -(t - t0)/timeout not touching abs(value(t) - target)
_timeout_base = 0
_timeout_dif = 1
def check_progress(self, value, target): def prepare_state(self, state):
"""called from read_status tol = self.tolerance
if not tol:
tol = 0.01 * max(abs(self.target), abs(self.value))
dif = abs(self.target - self.value)
return dif, tol, state.now, state.delta(0)
intended to be also be used for alternative implementations of read_status def state_approaching(self, state):
""" if self.init():
base = max(abs(target), abs(value)) self.status = 'BUSY', 'approaching'
tol = base * self.relative_tolerance + self.tolerance dif, tol, now, delta = self.prepare_state(state)
if tol == 0: if dif < tol:
tol = max(0.01, base * 0.01) state.timeout_base = now
now = time.time() state.next_step(self.state_inside)
dif = abs(value - target) return
if self._settling_start: # we were inside tol if not self.timeout:
self._spent_inside = now - self._settling_start return
if dif > tol: # transition inside -> outside if state.init():
self._settling_start = None state.timeout_base = now
else: # we were outside tol state.dif_crit = dif
if dif <= tol: # transition outside -> inside return
if not self._first_inside: min_slope = getattr(self, 'ramp', 0) or getattr('min_slope', 0)
self._first_inside = now state.dif_crit -= min_slope * delta
self._settling_start = now - self._spent_inside if dif < state.dif_crit:
if self._spent_inside > self.settling_time: state.timeout_base = now
return 'IDLE', '' elif now > state.timeout_base:
result = 'BUSY', ('inside tolerance' if self._settling_start else 'outside tolerance') self.status = 'WARNING', 'convergence timeout'
if self.timeout: state.next_action(self.state_idle)
if self._first_inside:
if now > self._first_inside + self.settling_time + self.timeout:
return 'WARNING', 'settling timeout'
return result
tmo2 = self.timeout / 2
def exponential_convergence(t): def state_inside(self, state):
return self._timeout_dif * 2 ** -(t - self._timeout_base) / tmo2 if state.init():
self.status = 'BUSY', 'inside tolerance'
dif, tol, now, delta = self.prepare_state(state)
if dif > tol:
state.next_action(self.state_outside)
state.spent_inside += delta
if state.spent_inside > self.settling_time:
self.status = 'IDLE', 'reached target'
state.next_action(self.state_idle)
if dif < exponential_convergence(now): def state_outside(self, state, now, dif, tol, delta):
# convergence is better than estimated, update expected curve if state.init():
self._timeout_dif = dif self.status = 'BUSY', 'outside tolerance'
self._timeout_base = now dif, tol, now, delta = self.prepare_state(state)
elif dif > exponential_convergence(now - tmo2): if dif < tol:
return 'WARNING', 'convergence timeout' state.next_action(self.state_inside)
return result elif now > self.timeout_base + self.settling_time + self.timeout:
self.status = 'WARNING', 'settling timeout'
state.next_action(self.state_idle)
def reset_progress(self, value, target): def start_state(self):
"""must be called from write_target, whenever the target changes""" """must be called from write_target, whenever the target changes"""
self._settling_start = None self.__state.start(self.state_approach)
self._first_inside = None
self._spent_inside = 0 def poll(self):
self._timeout_base = time.time() super().poll()
self._timeout_dif = abs(value - target) self.__state.poll()
def read_status(self): def read_status(self):
if self.status[0] == 'IDLE': if self.status[0] == 'IDLE':
@ -213,13 +215,15 @@ class HasProgressCheck:
class Loop(HasProgressCheck, MercuryChannel): class Loop(HasProgressCheck, MercuryChannel):
"""common base class for loops""" """common base class for loops"""
mode = Parameter('control mode', EnumType(manual=0, pid=1), readonly=False) mode = Parameter('control mode', EnumType(manual=0, pid=1), readonly=False)
prop = Parameter('pid proportional band', FloatRange(), readonly=False) ctrlpars = Parameter(
integ = Parameter('pid integral parameter', FloatRange(unit='min'), readonly=False) 'pid (proportional nad, integral time, differential time',
deriv = Parameter('pid differential parameter', FloatRange(unit='min'), readonly=False) StructOf(p=FloatRange(0, unit='$'), i=FloatRange(0, unit='min'), d=FloatRange(0, unit='min')),
readonly=False, poll=True
)
"""pid = Parameter('control parameters', StructOf(p=FloatRange(), i=FloatRange(), d=FloatRange()),readonly=False)""" """pid = Parameter('control parameters', StructOf(p=FloatRange(), i=FloatRange(), d=FloatRange()),readonly=False)"""
pid_table_mode = Parameter('', EnumType(off=0, on=1), readonly=False) pid_table_mode = Parameter('', EnumType(off=0, on=1), readonly=False)
def read_prop(self): def read_ctrlpars(self):
return self.query('0:LOOP:P') return self.query('0:LOOP:P')
def read_integ(self): def read_integ(self):