diff --git a/secop/lib/statemachine.py b/secop/lib/statemachine.py new file mode 100644 index 0000000..f24f00e --- /dev/null +++ b/secop/lib/statemachine.py @@ -0,0 +1,285 @@ +# -*- coding: utf-8 -*- +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** +"""a simple, but powerful state machine + +Mechanism +--------- + +The code for the state machine is NOT to be implemented as a subclass +of StateMachine, but usually as functions or methods of an other object. +The created state object may hold variables needed for the state. +A state function may return either: +- a function for the next state to transition to +- Retry() to keep the state and call the +- or `None` for finishing + + +Initialisation Code +------------------- + +For code to be called only after a state transition, use stateobj.init. + +def state_x(stateobj): + if stateobj.init: + ... code to be execute only after entering state x ... + ... further code ... + + +Cleanup Function +---------------- + +cleanup= as argument in StateMachine.__init__ or .start +defines a cleanup function to be called whenever the machine is stopped or +an error is raised in a state function. A cleanup function may return +either None for finishing or a further state function for continuing. + + +State Specific Cleanup Code +--------------------------- + +To execute state specific cleanup, the cleanup may examine the current state +(stateobj.state) in order to decide what to be done. + +If a need arises, a future extension to this library may support specific +cleanup functions by means of a decorator adding the specific cleanup function +as an attribute to the state function. + + +Threaded Use +------------ + +On start, a thread is started, which is waiting for a trigger event when the +machine is not active. For test purposes or special needs, the thread creation +may be disabled. :meth:`cycle` must be called periodically in this case. +""" + +import time +import threading +from logging import getLogger +from secop.lib import mkthread + + +class Stop(Exception): + """exception indicating that StateMachine.stop was called""" + + +class Restart(Stop): + """exception indicating that StateMachine.start was called + + while the state machine was active""" + + +class Retry: + def __init__(self, delay=None): + self.delay = delay + + +class StateMachine: + """a simple, but powerful state machine""" + # class attributes are not allowed to be overriden by kwds of __init__ or :meth:`start` + last_error = None # last exception + stop_exc = None # after a stop or restart, Stop or Restart + start_time = None # the time of last start + transition_time = None # the last time when the state changed + state = None # the current state + now = None + init = True + _last_time = 0 + + def __init__(self, state=None, logger=None, threaded=True, **kwds): + """initialize state machine + + :param state: if given, this is the first state + :param logger: an optional logger + :param threaded: whether a thread should be started (default: True) + :param kwds: any attributes for the state object + """ + self.default_delay = 0.25 # default delay when returning None + self.now = time.time() # avoid calling time.time several times per state + self.cleanup = lambda *args: None # default cleanup: finish on error + self.log = logger or getLogger('dummy') + self._update_attributes(kwds) + self._lock = threading.Lock() + self._stop_flag = False + self._trigger = threading.Event() + self._idle_event = threading.Event() + self._threaded = threaded + self._thread = None + self._restart = None + if state: + self.start(state) + + def _update_attributes(self, kwds): + """update allowed attributes""" + cls = type(self) + for key, value in kwds.items(): + if hasattr(cls, key): + raise AttributeError('can not set %s.%s' % (cls.__name__, key)) + setattr(self, key, value) + + @property + def is_active(self): + return bool(self.state) + + @property + def status_string(self): + if self.state is None: + return '' + doc = self.state.__doc__ + return doc.split('\n', 1)[0] if doc else self.state.__name__ + + @property + def state_time(self): + """the time spent already in this state""" + return self.now - self.transition_time + + @property + def run_time(self): + """time since last (re-)start""" + return self.now - self.start_time + + def _new_state(self, state): + self.state = state + self.init = True + self.now = time.time() + self.transition_time = self.now + + def cycle(self): + """do one cycle in the thread loop + + :return: a delay or None when idle + """ + if self.state is None: + return None + for _ in range(999): + self.now = time.time() + try: + ret = self.state(self) + self.init = False + if self.stop_exc: + raise self.stop_exc + except Exception as e: + self.log.info('%r raised in state %r', e, self.status_string) + self.last_error = e + ret = self.cleanup(self) + self.log.debug('cleanup %r %r %r', self.cleanup, self.last_error, ret) + if ret is None: + if self._restart: + self._start(**self._restart) + else: + self.state = None + self._idle_event.set() + return None + if callable(ret): + self._new_state(ret) + continue + if isinstance(ret, Retry): + if ret.delay == 0: + continue + if ret.delay is None: + return self.default_delay + return ret.delay + self.last_error = RuntimeError('return value must be callable, Retry(...) or finish') + break + else: + self.last_error = RuntimeError('too many states chained - probably infinite loop') + self.state = None + return None + + def _run(self): + """thread loop""" + while True: + delay = self.cycle() + self._trigger.wait(delay) # when delay is None, waiting infinitely (or until triggered) + self._trigger.clear() + + def _start(self, state, **kwds): + if self._threaded: + if self._thread is None or not self._thread.is_alive(): + # restart thread if dead (may happen when cleanup failed) + self._thread = mkthread(self._run) + self._restart = None + self._idle_event.clear() + self._trigger.set() # wake up from idle state + self.last_error = None + self.stop_exc = None + self._update_attributes(kwds) + self._new_state(state) + self.start_time = self.now + self._last_time = self.now + + def start(self, state, **kwds): + """start with a new state + + and interrupt the current state + the cleanup function will be called with last_error=Restart() + + :param state: the first state + :param kwds: items to put as attributes on the state machine + """ + self.log.debug('start %r', kwds) + if self.state: + self.stop_exc = Restart() + self._trigger.set() + kwds['state'] = state + self._restart = kwds + return + self._start(state, **kwds) + + def stop(self): + """stop machine, go to idle state + + the cleanup function will be called with exc=Stop() + """ + self.log.debug('stop') + self.stop_exc = Stop() + self._trigger.set() + + def wait(self, timeout=None): + """wait for state machine being idle""" + self._idle_event.wait(timeout) + + def delta(self, mindelta=0): + """helper method for time dependent control + + :param mindelta: minimum time since last call + :return: time delta or None when less than min delta time has passed + + to be called from within an state + + Usage: + + def state_x(self, state): + delta = state.delta(5) + if delta is None: + return # less than 5 seconds have passed, we wait for the next cycle + # delta is >= 5, and the zero time for delta is set + + # now we can use delta for control calculations + + remark: in the first step after start, state.delta(0) returns nearly 0 + """ + delta = self.now - self._last_time + if delta < mindelta: + return None + self._last_time = self.now + return delta diff --git a/test/test_statemachine.py b/test/test_statemachine.py new file mode 100644 index 0000000..fb61dda --- /dev/null +++ b/test/test_statemachine.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** + + +from secop.lib.statemachine import StateMachine, Stop, Retry + + +def rise(state): + state.step += 1 + print('rise', state.step) + if state.init: + state.status = 'rise' + state.level += 1 + if state.level > 3: + return turn + return Retry() + + +def turn(state): + state.step += 1 + if state.init: + state.status = 'turn' + state.direction += 1 + if state.direction > 3: + return fall + return Retry() + + +def fall(state): + state.step += 1 + if state.init: + state.status = 'fall' + state.level -= 1 + if state.level < 0: + raise ValueError('crash') + return Retry(0) # retry until crash! + + +def error_handler(state): + state.last_error_name = type(state.last_error).__name__ + + +class LoggerStub: + def debug(self, fmt, *args): + print(fmt % args) + info = warning = exception = error = debug + handlers = [] + + +class DummyThread: + def is_alive(self): + return True + + +def test_fun(): + s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub()) + assert s.step == 0 + assert s.status == '' + s.cycle() # do nothing + assert s.step == 0 + s.start(rise, level=0, direction=0) + s.cycle() + for i in range(1, 4): + assert s.status == 'rise' + assert s.step == i + assert s.level == i + assert s.direction == 0 + s.cycle() + for i in range(5, 8): + assert s.status == 'turn' + assert s.step == i + assert s.level == 4 + assert s.direction == i - 4 + s.cycle() + s.cycle() # -> crash + assert isinstance(s.last_error, ValueError) + assert str(s.last_error) == 'crash' + assert s.state is None + + +def test_max_chain(): + s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub()) + s.start(fall, level=999+1, direction=0) + s.cycle() + assert isinstance(s.last_error, RuntimeError) + assert s.state is None + + +def test_stop(): + s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub()) + s.start(rise, level=0, direction=0) + for _ in range(1, 3): + s.cycle() + s.stop() + s.cycle() + assert isinstance(s.last_error, Stop) + assert s.state is None + + +def test_std_error_handling(): + s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub()) + s.start(rise, level=0, direction=0) + s.cycle() + s.level = None # -> TypeError on next step + s.cycle() + assert s.state is None # default error handler: stop machine + assert isinstance(s.last_error, TypeError) + assert not hasattr(s, 'last_error_name') + + +def test_default_error_handling(): + s = StateMachine(step=0, status='', cleanup=error_handler, threaded=False, logger=LoggerStub()) + s.start(rise, level=0, direction=0) + s.cycle() + s.level = None + s.cycle() + assert s.state is None + assert s.last_error_name == 'TypeError' + assert isinstance(s.last_error, TypeError) + + +def test_cleanup_on_restart(): + s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub()) + s.start(rise, level=0, direction=0) + s.cycle() + s.start(turn) + s.cycle() + assert s.state is turn + assert s.last_error is None