fix statemachine

- fix: calling state.start(<new state>) on restart must ensure
  that the function <new state> is called before state.start()
  returns.
- modify slighly behaviour of cleanup function

Change-Id: I483a3aefa6af4712b3cf13f62c86d4c06edd1d8d
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/28020
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2022-03-28 15:14:31 +02:00
parent bb097ac3ba
commit 8d23503bbd
3 changed files with 123 additions and 83 deletions

View File

@ -51,6 +51,7 @@ cleanup=<cleanup function> as argument in StateMachine.__init__ or .start
defines a cleanup function to be called whenever the machine is stopped or
an error is raised in a state function. A cleanup function may return
either None for finishing or a further state function for continuing.
In case of stop or restart, this return value is ignored.
State Specific Cleanup Code
@ -74,18 +75,13 @@ may be disabled. :meth:`cycle` must be called periodically in this case.
import time
import threading
import queue
from logging import getLogger
from secop.lib import mkthread
from secop.lib import mkthread, UniqueObject
class Stop(Exception):
"""exception indicating that StateMachine.stop was called"""
class Restart(Stop):
"""exception indicating that StateMachine.start was called
while the state machine was active"""
Stop = UniqueObject('Stop')
Restart = UniqueObject('Restart')
class Retry:
@ -96,13 +92,13 @@ class Retry:
class StateMachine:
"""a simple, but powerful state machine"""
# class attributes are not allowed to be overriden by kwds of __init__ or :meth:`start`
last_error = None # last exception
stop_exc = None # after a stop or restart, Stop or Restart
start_time = None # the time of last start
transition_time = None # the last time when the state changed
state = None # the current state
now = None
init = True
stopped = False
last_error = None # last exception raised or Stop or Restart
_last_time = 0
def __init__(self, state=None, logger=None, threaded=True, **kwds):
@ -115,19 +111,31 @@ class StateMachine:
"""
self.default_delay = 0.25 # default delay when returning None
self.now = time.time() # avoid calling time.time several times per state
self.cleanup = lambda *args: None # default cleanup: finish on error
self.cleanup = self.default_cleanup # default cleanup: finish on error
self.log = logger or getLogger('dummy')
self._update_attributes(kwds)
self._lock = threading.Lock()
self._stop_flag = False
self._trigger = threading.Event()
self._idle_event = threading.Event()
self._lock = threading.RLock()
self._threaded = threaded
if threaded:
self._thread_queue = queue.Queue()
self._idle_event = threading.Event()
self._thread = None
self._restart = None
if state:
self.start(state)
@staticmethod
def default_cleanup(state):
"""default cleanup
:param self: the state object
:return: None (for custom cleanup functions this might be a new state)
"""
if state.stopped: # stop or restart
state.log.debug('%sed in state %r', repr(state.stopped).lower(), state.status_string)
else:
state.log.warning('%r raised in state %r', state.last_error, state.status_string)
def _update_attributes(self, kwds):
"""update allowed attributes"""
cls = type(self)
@ -162,6 +170,7 @@ class StateMachine:
self.init = True
self.now = time.time()
self.transition_time = self.now
self.log.debug('state: %s', self.status_string)
def cycle(self):
"""do one cycle in the thread loop
@ -170,91 +179,115 @@ class StateMachine:
"""
if self.state is None:
return None
for _ in range(999):
self.now = time.time()
try:
ret = self.state(self)
self.init = False
if self.stop_exc:
raise self.stop_exc
except Exception as e:
# Stop and Restart are not unusual -> no warning
log = self.log.debug if isinstance(e, Stop) else self.log.warning
log('%r raised in state %r', e, self.status_string)
self.last_error = e
ret = self.cleanup(self)
self.log.debug('cleanup %r %r %r', self.cleanup, self.last_error, ret)
if ret is None:
if self._restart:
self._start(**self._restart)
else:
with self._lock:
for _ in range(999):
self.now = time.time()
try:
ret = self.state(self)
self.init = False
if self.stopped:
self.last_error = self.stopped
self.cleanup(self)
self.stopped = False
ret = None
except Exception as e:
self.last_error = e
ret = self.cleanup(self)
self.log.debug('called %r %sexc=%r', self.cleanup,
'ret=%r ' % ret if ret else '', e)
if ret is None:
self.log.debug('state: None')
self.state = None
self._idle_event.set()
return None
if callable(ret):
self._new_state(ret)
continue
if isinstance(ret, Retry):
if ret.delay == 0:
return None
if callable(ret):
self._new_state(ret)
continue
if ret.delay is None:
return self.default_delay
return ret.delay
self.last_error = RuntimeError('return value must be callable, Retry(...) or finish')
break
else:
self.last_error = RuntimeError('too many states chained - probably infinite loop')
self.state = None
return None
if isinstance(ret, Retry):
if ret.delay == 0:
continue
if ret.delay is None:
return self.default_delay
return ret.delay
self.last_error = RuntimeError('return value must be callable, Retry(...) or finish')
break
else:
self.last_error = RuntimeError('too many states chained - probably infinite loop')
self.cleanup(self)
self.state = None
return None
def _run(self):
"""thread loop"""
while True:
delay = self.cycle()
self._trigger.wait(delay) # when delay is None, waiting infinitely (or until triggered)
self._trigger.clear()
def _start(self, state, **kwds):
def trigger(self, delay=0):
if self._threaded:
if self._thread is None or not self._thread.is_alive():
# restart thread if dead (may happen when cleanup failed)
self._thread = mkthread(self._run)
self._thread_queue.put(delay)
def _run(self, delay):
"""thread loop
:param delay: delay before first state is called
"""
while True:
try:
ret = self._thread_queue.get(timeout=delay)
if ret is not None:
delay = ret
continue
except queue.Empty:
pass
delay = self.cycle()
def _start(self, state, first_delay, **kwds):
self._restart = None
self._idle_event.clear()
self._trigger.set() # wake up from idle state
self.last_error = None
self.stop_exc = None
self.stopped = False
self._update_attributes(kwds)
self._new_state(state)
self.start_time = self.now
self._last_time = self.now
if self._threaded:
if self._thread is None or not self._thread.is_alive():
# restart thread if dead (may happen when cleanup failed)
self._thread = mkthread(self._run, first_delay)
else:
self.trigger(first_delay)
def start(self, state, **kwds):
"""start with a new state
and interrupt the current state
the cleanup function will be called with last_error=Restart()
the cleanup function will be called with state.stopped=Restart
:param state: the first state
:param kwds: items to put as attributes on the state machine
"""
self.log.debug('start %r', kwds)
if self.state:
self.stop_exc = Restart()
self._trigger.set()
kwds['state'] = state
self._restart = kwds
return
self._start(state, **kwds)
self.stopped = Restart
with self._lock: # wait for running cycle finished
if self.stopped: # cleanup is not yet done
self.last_error = self.stopped
self.cleanup(self) # ignore return state on restart
self.stopped = False
delay = self.cycle()
self._start(state, delay, **kwds)
else:
delay = self.cycle() # important: call once (e.g. set status to busy)
self._start(state, delay, **kwds)
def stop(self):
"""stop machine, go to idle state
the cleanup function will be called with exc=Stop()
the cleanup function will be called with state.stopped=Stop
"""
self.log.debug('stop')
self.stop_exc = Stop()
self._trigger.set()
self.stopped = Stop
with self._lock:
if self.stopped: # cleanup is not yet done
self.last_error = self.stopped
self.cleanup(self) # ignore return state on restart
self.stopped = False
self.state = None
def wait(self, timeout=None):
"""wait for state machine being idle"""

View File

@ -20,8 +20,8 @@
#
# *****************************************************************************
from secop.core import Parameter, FloatRange, BUSY, IDLE, WARN, ERROR
from secop.lib.statemachine import StateMachine, Retry
from secop.core import Parameter, FloatRange, BUSY, IDLE, WARN
from secop.lib.statemachine import StateMachine, Retry, Stop
class HasConvergence:
@ -51,17 +51,25 @@ class HasConvergence:
then the timeout event happens after this time + <settling_time> + <timeout>.
''', FloatRange(0, unit='sec'), readonly=False, default=3600)
status = Parameter('status determined from convergence checks', default=(IDLE, ''))
convergence_state = None
def earlyInit(self):
super().earlyInit()
self.convergence_state = StateMachine(threaded=False, logger=self.log, spent_inside=0)
self.convergence_state = StateMachine(threaded=False, logger=self.log,
cleanup=self.cleanup, spent_inside=0)
def cleanup(self, state):
state.default_cleanup(state)
if self.stopped:
if self.stopped is Stop: # and not Restart
self.status = WARN, 'stopped'
else:
self.status = WARN, repr(state.last_error)
def doPoll(self):
super().doPoll()
state = self.convergence_state
state.cycle()
if not state.is_active and state.last_error is not None:
self.status = ERROR, repr(state.last_error)
def get_min_slope(self, dif):
slope = getattr(self, 'workingramp', 0) or getattr(self, 'ramp', 0)
@ -80,7 +88,6 @@ class HasConvergence:
def start_state(self):
"""to be called from write_target"""
self.convergence_state.start(self.state_approach)
self.convergence_state.cycle()
def state_approach(self, state):
"""approaching, checking progress (busy)"""
@ -105,8 +112,6 @@ class HasConvergence:
def state_inside(self, state):
"""inside tolerance, still busy"""
if state.init:
self.status = BUSY, 'inside tolerance'
dif, tol = self.get_dif_tol()
if dif > tol:
return self.state_outside
@ -114,18 +119,20 @@ class HasConvergence:
if state.spent_inside > self.settling_time:
self.status = IDLE, 'reached target'
return self.state_stable
if state.init:
self.status = BUSY, 'inside tolerance'
return Retry()
def state_outside(self, state):
"""temporarely outside tolerance, busy"""
if state.init:
self.status = BUSY, 'outside tolerance'
dif, tol = self.get_dif_tol()
if dif < tol:
return self.state_inside
if state.now > state.timeout_base + self.settling_time + self.timeout:
self.status = WARN, 'settling timeout'
return self.state_instable
if state.init:
self.status = BUSY, 'outside tolerance'
# do not reset the settling time on occasional outliers, count backwards instead
state.spent_inside = max(0.0, state.spent_inside - state.delta())
return Retry()

View File

@ -112,7 +112,7 @@ def test_stop():
s.cycle()
s.stop()
s.cycle()
assert isinstance(s.last_error, Stop)
assert s.last_error is Stop
assert s.state is None