From d5924567da8861967ec80b2897e66986c95da405 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Mon, 28 Mar 2022 15:14:31 +0200 Subject: [PATCH] fix statemachine - fix: calling state.start() on restart must ensure that the function is called before state.start() returns. - modify slighly behaviour of cleanup function Change-Id: I483a3aefa6af4712b3cf13f62c86d4c06edd1d8d Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/28020 Tested-by: Jenkins Automated Tests Reviewed-by: Enrico Faulhaber Reviewed-by: Markus Zolliker --- secop/lib/statemachine.py | 177 ++++++++++++++++++++++---------------- secop_psi/convergence.py | 27 +++--- test/test_statemachine.py | 2 +- 3 files changed, 123 insertions(+), 83 deletions(-) diff --git a/secop/lib/statemachine.py b/secop/lib/statemachine.py index 1742ff8..6d066c5 100644 --- a/secop/lib/statemachine.py +++ b/secop/lib/statemachine.py @@ -51,6 +51,7 @@ cleanup= as argument in StateMachine.__init__ or .start defines a cleanup function to be called whenever the machine is stopped or an error is raised in a state function. A cleanup function may return either None for finishing or a further state function for continuing. +In case of stop or restart, this return value is ignored. State Specific Cleanup Code @@ -74,18 +75,13 @@ may be disabled. :meth:`cycle` must be called periodically in this case. import time import threading +import queue from logging import getLogger -from secop.lib import mkthread +from secop.lib import mkthread, UniqueObject -class Stop(Exception): - """exception indicating that StateMachine.stop was called""" - - -class Restart(Stop): - """exception indicating that StateMachine.start was called - - while the state machine was active""" +Stop = UniqueObject('Stop') +Restart = UniqueObject('Restart') class Retry: @@ -96,13 +92,13 @@ class Retry: class StateMachine: """a simple, but powerful state machine""" # class attributes are not allowed to be overriden by kwds of __init__ or :meth:`start` - last_error = None # last exception - stop_exc = None # after a stop or restart, Stop or Restart start_time = None # the time of last start transition_time = None # the last time when the state changed state = None # the current state now = None init = True + stopped = False + last_error = None # last exception raised or Stop or Restart _last_time = 0 def __init__(self, state=None, logger=None, threaded=True, **kwds): @@ -115,19 +111,31 @@ class StateMachine: """ self.default_delay = 0.25 # default delay when returning None self.now = time.time() # avoid calling time.time several times per state - self.cleanup = lambda *args: None # default cleanup: finish on error + self.cleanup = self.default_cleanup # default cleanup: finish on error self.log = logger or getLogger('dummy') self._update_attributes(kwds) - self._lock = threading.Lock() - self._stop_flag = False - self._trigger = threading.Event() - self._idle_event = threading.Event() + self._lock = threading.RLock() self._threaded = threaded + if threaded: + self._thread_queue = queue.Queue() + self._idle_event = threading.Event() self._thread = None self._restart = None if state: self.start(state) + @staticmethod + def default_cleanup(state): + """default cleanup + + :param self: the state object + :return: None (for custom cleanup functions this might be a new state) + """ + if state.stopped: # stop or restart + state.log.debug('%sed in state %r', repr(state.stopped).lower(), state.status_string) + else: + state.log.warning('%r raised in state %r', state.last_error, state.status_string) + def _update_attributes(self, kwds): """update allowed attributes""" cls = type(self) @@ -162,6 +170,7 @@ class StateMachine: self.init = True self.now = time.time() self.transition_time = self.now + self.log.debug('state: %s', self.status_string) def cycle(self): """do one cycle in the thread loop @@ -170,91 +179,115 @@ class StateMachine: """ if self.state is None: return None - for _ in range(999): - self.now = time.time() - try: - ret = self.state(self) - self.init = False - if self.stop_exc: - raise self.stop_exc - except Exception as e: - # Stop and Restart are not unusual -> no warning - log = self.log.debug if isinstance(e, Stop) else self.log.warning - log('%r raised in state %r', e, self.status_string) - self.last_error = e - ret = self.cleanup(self) - self.log.debug('cleanup %r %r %r', self.cleanup, self.last_error, ret) - if ret is None: - if self._restart: - self._start(**self._restart) - else: + with self._lock: + for _ in range(999): + self.now = time.time() + try: + ret = self.state(self) + self.init = False + if self.stopped: + self.last_error = self.stopped + self.cleanup(self) + self.stopped = False + ret = None + except Exception as e: + self.last_error = e + ret = self.cleanup(self) + self.log.debug('called %r %sexc=%r', self.cleanup, + 'ret=%r ' % ret if ret else '', e) + if ret is None: + self.log.debug('state: None') self.state = None self._idle_event.set() - return None - if callable(ret): - self._new_state(ret) - continue - if isinstance(ret, Retry): - if ret.delay == 0: + return None + if callable(ret): + self._new_state(ret) continue - if ret.delay is None: - return self.default_delay - return ret.delay - self.last_error = RuntimeError('return value must be callable, Retry(...) or finish') - break - else: - self.last_error = RuntimeError('too many states chained - probably infinite loop') - self.state = None - return None + if isinstance(ret, Retry): + if ret.delay == 0: + continue + if ret.delay is None: + return self.default_delay + return ret.delay + self.last_error = RuntimeError('return value must be callable, Retry(...) or finish') + break + else: + self.last_error = RuntimeError('too many states chained - probably infinite loop') + self.cleanup(self) + self.state = None + return None - def _run(self): - """thread loop""" - while True: - delay = self.cycle() - self._trigger.wait(delay) # when delay is None, waiting infinitely (or until triggered) - self._trigger.clear() - - def _start(self, state, **kwds): + def trigger(self, delay=0): if self._threaded: - if self._thread is None or not self._thread.is_alive(): - # restart thread if dead (may happen when cleanup failed) - self._thread = mkthread(self._run) + self._thread_queue.put(delay) + + def _run(self, delay): + """thread loop + + :param delay: delay before first state is called + """ + while True: + try: + ret = self._thread_queue.get(timeout=delay) + if ret is not None: + delay = ret + continue + except queue.Empty: + pass + delay = self.cycle() + + def _start(self, state, first_delay, **kwds): self._restart = None self._idle_event.clear() - self._trigger.set() # wake up from idle state self.last_error = None - self.stop_exc = None + self.stopped = False self._update_attributes(kwds) self._new_state(state) self.start_time = self.now self._last_time = self.now + if self._threaded: + if self._thread is None or not self._thread.is_alive(): + # restart thread if dead (may happen when cleanup failed) + self._thread = mkthread(self._run, first_delay) + else: + self.trigger(first_delay) def start(self, state, **kwds): """start with a new state and interrupt the current state - the cleanup function will be called with last_error=Restart() + the cleanup function will be called with state.stopped=Restart :param state: the first state :param kwds: items to put as attributes on the state machine """ self.log.debug('start %r', kwds) if self.state: - self.stop_exc = Restart() - self._trigger.set() - kwds['state'] = state - self._restart = kwds - return - self._start(state, **kwds) + self.stopped = Restart + with self._lock: # wait for running cycle finished + if self.stopped: # cleanup is not yet done + self.last_error = self.stopped + self.cleanup(self) # ignore return state on restart + self.stopped = False + delay = self.cycle() + self._start(state, delay, **kwds) + else: + delay = self.cycle() # important: call once (e.g. set status to busy) + self._start(state, delay, **kwds) def stop(self): """stop machine, go to idle state - the cleanup function will be called with exc=Stop() + the cleanup function will be called with state.stopped=Stop """ self.log.debug('stop') - self.stop_exc = Stop() - self._trigger.set() + self.stopped = Stop + with self._lock: + if self.stopped: # cleanup is not yet done + self.last_error = self.stopped + self.cleanup(self) # ignore return state on restart + self.stopped = False + self.state = None def wait(self, timeout=None): """wait for state machine being idle""" diff --git a/secop_psi/convergence.py b/secop_psi/convergence.py index bf6c5c0..627cf83 100644 --- a/secop_psi/convergence.py +++ b/secop_psi/convergence.py @@ -20,8 +20,8 @@ # # ***************************************************************************** -from secop.core import Parameter, FloatRange, BUSY, IDLE, WARN, ERROR -from secop.lib.statemachine import StateMachine, Retry +from secop.core import Parameter, FloatRange, BUSY, IDLE, WARN +from secop.lib.statemachine import StateMachine, Retry, Stop class HasConvergence: @@ -51,17 +51,25 @@ class HasConvergence: then the timeout event happens after this time + + . ''', FloatRange(0, unit='sec'), readonly=False, default=3600) status = Parameter('status determined from convergence checks', default=(IDLE, '')) + convergence_state = None def earlyInit(self): super().earlyInit() - self.convergence_state = StateMachine(threaded=False, logger=self.log, spent_inside=0) + self.convergence_state = StateMachine(threaded=False, logger=self.log, + cleanup=self.cleanup, spent_inside=0) + + def cleanup(self, state): + state.default_cleanup(state) + if self.stopped: + if self.stopped is Stop: # and not Restart + self.status = WARN, 'stopped' + else: + self.status = WARN, repr(state.last_error) def doPoll(self): super().doPoll() state = self.convergence_state state.cycle() - if not state.is_active and state.last_error is not None: - self.status = ERROR, repr(state.last_error) def get_min_slope(self, dif): slope = getattr(self, 'workingramp', 0) or getattr(self, 'ramp', 0) @@ -80,7 +88,6 @@ class HasConvergence: def start_state(self): """to be called from write_target""" self.convergence_state.start(self.state_approach) - self.convergence_state.cycle() def state_approach(self, state): """approaching, checking progress (busy)""" @@ -105,8 +112,6 @@ class HasConvergence: def state_inside(self, state): """inside tolerance, still busy""" - if state.init: - self.status = BUSY, 'inside tolerance' dif, tol = self.get_dif_tol() if dif > tol: return self.state_outside @@ -114,18 +119,20 @@ class HasConvergence: if state.spent_inside > self.settling_time: self.status = IDLE, 'reached target' return self.state_stable + if state.init: + self.status = BUSY, 'inside tolerance' return Retry() def state_outside(self, state): """temporarely outside tolerance, busy""" - if state.init: - self.status = BUSY, 'outside tolerance' dif, tol = self.get_dif_tol() if dif < tol: return self.state_inside if state.now > state.timeout_base + self.settling_time + self.timeout: self.status = WARN, 'settling timeout' return self.state_instable + if state.init: + self.status = BUSY, 'outside tolerance' # do not reset the settling time on occasional outliers, count backwards instead state.spent_inside = max(0.0, state.spent_inside - state.delta()) return Retry() diff --git a/test/test_statemachine.py b/test/test_statemachine.py index fb61dda..b835fb3 100644 --- a/test/test_statemachine.py +++ b/test/test_statemachine.py @@ -112,7 +112,7 @@ def test_stop(): s.cycle() s.stop() s.cycle() - assert isinstance(s.last_error, Stop) + assert s.last_error is Stop assert s.state is None