redesign of the state machine

With the current implementation, we run into a deadlock with the lock
from the state machine interfering with the accessLock on the module.
We can not wait for the state machine to finish while having the
accessLock locked by write_target. As a consequence, when restarting
the state machine we should not wait, but remember the state function
to call and postpone the restart after the cleanup has finished.
For this, we want to know the status before calling the state function.

- create HasState mixin, using doPoll for driving the machine
- StatusCode decorator for assigning a status to a state function
- remove the state machines 'threaded' option
- 'Retry' is now a unique value instead of a class. The retry period
  is determined by the (fast) poll interval.
- return 'Finish' instead of None for finishing the machine. returning
  None for state function is now an error, as this might happen
  easily inadvertently.

Change-Id: Icb31367442f10e98be69af3e05a84f12ce5cc966
This commit is contained in:
zolliker 2022-12-06 09:59:56 +01:00
parent d09634a55d
commit a14c282993
6 changed files with 573 additions and 265 deletions

View File

@ -29,113 +29,92 @@ of StateMachine, but usually as functions or methods of an other object.
The created state object may hold variables needed for the state.
A state function may return either:
- a function for the next state to transition to
- Retry(<delay>) to keep the state and call the
- or `None` for finishing
- Retry to keep the state and call the state function again
- or Finish for finishing
Initialisation Code
-------------------
For code to be called only after a state transition, use stateobj.init.
For code to be called only after a state transition, use statemachine.init.
def state_x(stateobj):
if stateobj.init:
def state_x(statemachine):
if statemachine.init:
... code to be execute only after entering state x ...
... further code ...
Cleanup Function
----------------
Restart
-------
cleanup=<cleanup function> as argument in StateMachine.__init__ or .start
defines a cleanup function to be called whenever the machine is stopped or
an error is raised in a state function. A cleanup function may return
either None for finishing or a further state function for continuing.
In case of stop or restart, this return value is ignored.
To restart the statemachine, call statemachine.start. The current task is interrupted,
the cleanup sequence is called, and after this the machine is restarted with the
arguments of the start method.
State Specific Cleanup Code
---------------------------
Stop
----
To execute state specific cleanup, the cleanup may examine the current state
(stateobj.state) in order to decide what to be done.
If a need arises, a future extension to this library may support specific
cleanup functions by means of a decorator adding the specific cleanup function
as an attribute to the state function.
To stop the statemachine, call statemachine.stop. The current task is interrupted,
the cleanup sequence is called, and the machine finishes.
Threaded Use
------------
Cleaning Up
-----------
On start, a thread is started, which is waiting for a trigger event when the
machine is not active. For test purposes or special needs, the thread creation
may be disabled. :meth:`cycle` must be called periodically in this case.
A cleanup function might be added as arguments to StateMachine.start.
On error, stop or restart, the cleanup sequence will be executed.
The cleanup itself is not be interrupted:
- if a further exeception is raised, the machine is interrupted immediately
- if start or stop is called again, a previous start or stop is ignored.
The currently running cleanup sequence is finished, and not started again.
"""
import time
import threading
import queue
from logging import getLogger
from frappy.lib import mkthread, UniqueObject
from frappy.lib import UniqueObject
Retry = UniqueObject('Retry')
Finish = UniqueObject('Finish')
Stop = UniqueObject('Stop')
Restart = UniqueObject('Restart')
class Start:
def __init__(self, newstate, kwds):
self.newstate = newstate
self.kwds = kwds # statemachine attributes
class Retry:
def __init__(self, delay=None):
self.delay = delay
class Stop:
pass
class StateMachine:
"""a simple, but powerful state machine"""
# class attributes are not allowed to be overriden by kwds of __init__ or :meth:`start`
start_time = None # the time of last start
transition_time = None # the last time when the state changed
state = None # the current state
now = None
init = True
stopped = False
last_error = None # last exception raised or Stop or Restart
_last_time = 0
statefunc = None # the current statefunc
now = None # the current time (avoid mutiple calls within a state)
init = True # True only in the first call of a state after a transition
next_task = None # None or an instance of Start or Stop
cleanup_reason = None # None or an instance of Exception, Start or Stop
_last_time = 0 # for delta method
def __init__(self, state=None, logger=None, threaded=True, **kwds):
def __init__(self, statefunc=None, logger=None, **kwds):
"""initialize state machine
:param state: if given, this is the first state
:param statefunc: if given, this is the first statefunc
:param logger: an optional logger
:param threaded: whether a thread should be started (default: True)
:param kwds: any attributes for the state object
"""
self.default_delay = 0.25 # default delay when returning None
self.now = time.time() # avoid calling time.time several times per state
self.cleanup = self.default_cleanup # default cleanup: finish on error
self.cleanup = None
self.transition = None
self.maxloops = 10 # the maximum number of statefunc functions called in sequence without Retry
self.now = time.time() # avoids calling time.time several times per statefunc
self.log = logger or getLogger('dummy')
self._lock = threading.Lock()
self._update_attributes(kwds)
self._lock = threading.RLock()
self._threaded = threaded
if threaded:
self._thread_queue = queue.Queue()
self._idle_event = threading.Event()
self._thread = None
self._restart = None
if state:
self.start(state)
@staticmethod
def default_cleanup(state):
"""default cleanup
:param self: the state object
:return: None (for custom cleanup functions this might be a new state)
"""
if state.stopped: # stop or restart
verb = 'stopped' if state.stopped is Stop else 'restarted'
state.log.debug('%s in state %r', verb, state.status_string)
else:
state.log.warning('%r raised in state %r', state.last_error, state.status_string)
if statefunc:
self.start(statefunc)
def _update_attributes(self, kwds):
"""update allowed attributes"""
@ -145,154 +124,101 @@ class StateMachine:
raise AttributeError('can not set %s.%s' % (cls.__name__, key))
setattr(self, key, value)
def _cleanup(self, reason):
if isinstance(reason, Exception):
self.log.warning('%s: raised %r', self.statefunc.__name__, reason)
elif isinstance(reason, Stop):
self.log.debug('stopped in %s', self.statefunc.__name__)
else: # must be Start
self.log.debug('restart %s during %s', reason.newstate.__name__, self.statefunc.__name__)
if self.cleanup_reason is None:
self.cleanup_reason = reason
if not self.cleanup:
return None # no cleanup needed or cleanup already handled
with self._lock:
cleanup, self.cleanup = self.cleanup, None
ret = None
try:
ret = cleanup(self) # pylint: disable=not-callable # None or function
if not (ret is None or callable(ret)):
self.log.error('%s: return value must be callable or None, not %r',
self.statefunc.__name__, ret)
ret = None
except Exception as e:
self.log.exception('%r raised in cleanup', e)
return ret
@property
def is_active(self):
return bool(self.state)
return bool(self.statefunc)
@property
def status_string(self):
if self.state is None:
return ''
doc = self.state.__doc__
return doc.split('\n', 1)[0] if doc else self.state.__name__
@property
def state_time(self):
"""the time spent already in this state"""
return self.now - self.transition_time
@property
def run_time(self):
"""time since last (re-)start"""
return self.now - self.start_time
def _new_state(self, state):
self.state = state
def _new_state(self, statefunc):
if self.transition:
self.transition(self, statefunc) # pylint: disable=not-callable # None or function
self.init = True
self.now = time.time()
self.transition_time = self.now
self.log.debug('state: %s', self.status_string)
self.statefunc = statefunc
self._last_time = self.now
def cycle(self):
"""do one cycle in the thread loop
"""do one cycle
:return: a delay or None when idle
call state functions until Retry is returned
"""
with self._lock:
if self.state is None:
return None
for _ in range(999):
self.now = time.time()
try:
ret = self.state(self)
self.init = False
if self.stopped:
self.last_error = self.stopped
self.cleanup(self)
self.stopped = False
ret = None
except Exception as e:
self.last_error = e
ret = self.cleanup(self)
self.log.debug('called %r %sexc=%r', self.cleanup,
'ret=%r ' % ret if ret else '', e)
if ret is None:
self.log.debug('state: None after cleanup')
self.state = None
self._idle_event.set()
return None
if callable(ret):
for _ in range(2):
if self.statefunc:
for _ in range(self.maxloops):
self.now = time.time()
if self.next_task and not self.cleanup_reason:
# interrupt only when not cleaning up
ret = self._cleanup(self.next_task)
else:
try:
ret = self.statefunc(self)
self.init = False
if ret is Retry:
return
if ret is Finish:
break
if not callable(ret):
ret = self._cleanup(RuntimeError(
'%s: return value must be callable, Retry or Finish, not %r'
% (self.statefunc.__name__, ret)))
except Exception as e:
ret = self._cleanup(e)
if ret is None:
break
self._new_state(ret)
continue
if isinstance(ret, Retry):
if ret.delay == 0:
else:
ret = self._cleanup(RuntimeError(
'%s: too many states chained - probably infinite loop' % self.statefunc.__name__))
if ret:
self._new_state(ret)
continue
if ret.delay is None:
return self.default_delay
return ret.delay
self.last_error = RuntimeError('return value must be callable, Retry(...) or finish')
break
else:
self.last_error = RuntimeError('too many states chained - probably infinite loop')
self.cleanup(self)
self.state = None
return None
if self.cleanup_reason is None:
self.log.debug('finish in state %r', self.statefunc.__name__)
self._new_state(None)
if self.next_task:
with self._lock:
action, self.next_task = self.next_task, None
self.cleanup_reason = None
if isinstance(action, Start):
self._new_state(action.newstate)
self._update_attributes(action.kwds)
def trigger(self, delay=0):
if self._threaded:
self._thread_queue.put(delay)
def _run(self, delay):
"""thread loop
:param delay: delay before first state is called
"""
while True:
try:
ret = self._thread_queue.get(timeout=delay)
if ret is not None:
delay = ret
continue
except queue.Empty:
pass
delay = self.cycle()
def _start(self, state, **kwds):
self._restart = None
self._idle_event.clear()
self.last_error = None
self.stopped = False
self._update_attributes(kwds)
self._new_state(state)
self.start_time = self.now
self._last_time = self.now
first_delay = self.cycle() # important: call once (e.g. set status to busy)
if self._threaded:
if self._thread is None or not self._thread.is_alive():
# restart thread if dead (may happen when cleanup failed)
if first_delay is not None:
self._thread = mkthread(self._run, first_delay)
else:
self.trigger(first_delay)
def start(self, state, **kwds):
def start(self, statefunc, **kwds):
"""start with a new state
and interrupt the current state
the cleanup function will be called with state.stopped=Restart
:param state: the first state
:param statefunc: the first state
:param kwds: items to put as attributes on the state machine
"""
self.log.debug('start %r', kwds)
if self.state:
self.stopped = Restart
with self._lock: # wait for running cycle finished
if self.stopped: # cleanup is not yet done
self.last_error = self.stopped
self.cleanup(self) # ignore return state on restart
self.stopped = False
self._start(state, **kwds)
else:
self._start(state, **kwds)
kwds.setdefault('cleanup', None) # cleanup must be given on each restart
with self._lock:
self.next_task = Start(statefunc, kwds)
def stop(self):
"""stop machine, go to idle state
the cleanup function will be called with state.stopped=Stop
"""
self.log.debug('stop')
self.stopped = Stop
"""stop machine, go to idle state"""
with self._lock:
if self.stopped: # cleanup is not yet done
self.last_error = self.stopped
self.cleanup(self) # ignore return state on restart
self.stopped = False
self.state = None
def wait(self, timeout=None):
"""wait for state machine being idle"""
self._idle_event.wait(timeout)
self.next_task = Stop()
def delta(self, mindelta=0):
"""helper method for time dependent control
@ -300,7 +226,7 @@ class StateMachine:
:param mindelta: minimum time since last call
:return: time delta or None when less than min delta time has passed
to be called from within an state
to be called from within an state function
Usage:

View File

@ -227,8 +227,13 @@ class PollInfo:
self.fast_flag = False
self.trigger_event = trigger_event
def trigger(self):
"""trigger a recalculation of poll due times"""
def trigger(self, immediate=False):
"""trigger a recalculation of poll due times
:param immediate: when True, doPoll should be called as soon as possible
"""
if immediate:
self.last_main = 0
self.trigger_event.set()
def update_interval(self, pollinterval):

197
frappy/states.py Normal file
View File

@ -0,0 +1,197 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""state machine mixin
handles status depending on statemachine state
"""
from frappy.core import BUSY, IDLE, ERROR, Parameter, Command
from frappy.errors import ProgrammingError
from frappy.lib.statemachine import StateMachine, Finish, Start, Stop, \
Retry # pylint: disable=unused-import
class StatusCode:
"""decorator for state methods
:param code: the code assigned to the state function
:param text: the text assigned to the state function
if not given, the text is taken from the state functions name
"""
def __init__(self, code, text=None):
self.code = code
self.text = text
def __set_name__(self, owner, name):
if not issubclass(owner, HasStates):
raise ProgrammingError('when using decorator "status_code", %s must inherit HasStates' % owner.__name__)
self.cls = owner
self.name = name
if 'statusMap' not in owner.__dict__:
# we need a copy on each inheritance level
owner.statusMap = owner.statusMap.copy()
owner.statusMap[name] = self.code, name.replace('_', ' ') if self.text is None else self.text
setattr(owner, name, self.func) # replace with original method
def __call__(self, func):
self.func = func
return self
class HasStates:
"""mixin for modules needing a statemachine"""
status = Parameter() # make sure this is a parameter
_state_machine = None
statusMap = {} # a dict populated with status values for methods used as state functions
def init_state_machine(self, **kwds):
"""initialize the state machine
might be overridden in order to add additional attributes initialized
:param kwds: additional attributes
"""
self._state_machine = StateMachine(
logger=self.log,
idle_status=(IDLE, ''),
transition=self.state_transition,
reset_fast_poll=False,
status_text='',
**kwds)
def initModule(self):
super().initModule()
self.init_state_machine()
def state_transition(self, sm, newstate):
"""handle status updates"""
status = self.get_status(newstate)
if status is not None:
# if a status_code is given, remember the text of this state
sm.status_text = status[1]
if isinstance(sm.next_task, Stop):
if newstate:
status = self.status[0], 'stopping (%s)' % sm.status_text
elif isinstance(sm.next_task, Start):
next_status = self.get_status(sm.next_task.newstate, BUSY)
if newstate:
# restart case
status = next_status[0], 'restarting (%s)' % sm.status_text
else:
# start case
status = next_status
if status is None:
return # no status_code given -> no change
if status != self.status:
self.status = status
def get_status(self, statefunc, default_code=None):
"""get the status assigned to a statefunc
:param statefunc: the state function to get the status from. if None, the idle_status attribute
of the state machine is returned
:param default_code: if None, None is returned in case no status_code is attached to statefunc
otherwise the returned status is composed by default_code and the modified name of the statefuncs
:return: a status or None
"""
if statefunc is None:
status = self._state_machine.idle_status or (ERROR, 'Finish was returned without final status')
else:
name = statefunc.__name__
status = self.statusMap.get(name)
if status is None and default_code is not None:
status = default_code, name.replace('_', ' ')
print('get_status', statefunc, status, default_code)
return status
def doPoll(self):
super().doPoll()
sm = self._state_machine
sm.cycle()
if sm.statefunc is None and sm.reset_fast_poll:
sm.reset_fast_poll = False
self.setFastPoll(False)
def start_machine(self, statefunc, fast_poll=True, cleanup=None, **kwds):
"""start or restart the state machine
:param statefunc: the initial state to be called
:param fast_poll: flag to indicate that polling has to switched to fast
:param cleanup: a cleanup function
:param kwds: attributes to be added to the state machine on start
If the state machine is already running, the following happens:
1) the currently executing state function, if any, is finished
2) in case the cleanup attribute on the state machine object is not None,
it is called and subsequently the state functions returned are executed,
until a state function returns None or Finish. However, in case a cleanup
sequence is already running, this is finished instead.
3) only then, the new cleanup function and all the attributes given
in kwds are set on the state machine
4) the state machine continues at the given statefunc
"""
sm = self._state_machine
status = self.get_status(statefunc, BUSY)
if sm.statefunc:
status = status[0], 'restarting'
self.status = status
sm.status_text = status[1]
sm.start(statefunc, cleanup=cleanup, **kwds)
if fast_poll:
sm.reset_fast_poll = True
self.setFastPoll(True)
self.pollInfo.trigger(True) # trigger poller
def stop_machine(self, stopped_status=(IDLE, 'stopped')):
"""stop the currently running machine
:param stopped_status: status to be set after stopping
If the state machine is not running, nothing happens.
Else the state machine is stoppen, the predefined cleanup
sequence is executed and then the status is set to the value
given in the sopped_status argument.
An already running cleanup sequence is not executed again.
"""
sm = self._state_machine
if sm.is_active:
sm.idle_status = stopped_status
sm.stop()
self.status = self.status[0], 'stopping'
self.pollInfo.trigger(True) # trigger poller
@Command
def stop(self):
self.stop_machine()
def final_status(self, code=IDLE, text=''):
"""final status
Usage:
return self.final_status('IDLE', 'machine idle')
"""
sm = self._state_machine
sm.idle_status = code, text
return Finish

View File

@ -97,7 +97,7 @@ class HasConvergence:
state.timeout_base = state.now
return self.state_inside
if not self.timeout:
return Retry()
return Retry
if state.init:
state.timeout_base = state.now
state.dif_crit = dif # criterium for resetting timeout base
@ -108,7 +108,7 @@ class HasConvergence:
elif state.now > state.timeout_base + self.timeout:
self.status = WARN, 'convergence timeout'
return self.state_instable
return Retry()
return Retry
def state_inside(self, state):
"""inside tolerance, still busy"""
@ -121,7 +121,7 @@ class HasConvergence:
return self.state_stable
if state.init:
self.status = BUSY, 'inside tolerance'
return Retry()
return Retry
def state_outside(self, state):
"""temporarely outside tolerance, busy"""
@ -135,13 +135,13 @@ class HasConvergence:
self.status = BUSY, 'outside tolerance'
# do not reset the settling time on occasional outliers, count backwards instead
state.spent_inside = max(0.0, state.spent_inside - state.delta())
return Retry()
return Retry
def state_stable(self, state):
"""stable, after settling_time spent within tolerance, idle"""
dif, tol = self.get_dif_tol()
if dif <= tol:
return Retry()
return Retry
self.status = WARN, 'instable'
state.spent_inside = max(self.settling_time, state.spent_inside)
return self.state_instable
@ -156,7 +156,7 @@ class HasConvergence:
return self.state_stable
else:
state.spent_inside = max(0, state.spent_inside - state.delta())
return Retry()
return Retry
def state_interrupt(self, state):
"""stopping"""

View File

@ -126,7 +126,7 @@ class MotorValve(PersistentMixin, Drivable):
self.motor.stop()
self.status = ERROR, 'opening valve failed (home switch not released)'
return None
return Retry()
return Retry
motvalue = self.motor.read_value()
if abs(motvalue - self.turns * 360) < 5:
self.read_value() # value = opened, status = IDLE
@ -147,7 +147,7 @@ class MotorValve(PersistentMixin, Drivable):
if self.motor.isBusy():
if self.motor.home:
return self.find_closed
return Retry()
return Retry
motvalue = self.motor.read_value()
if abs(motvalue) > 5:
if state.count > 0:
@ -171,7 +171,7 @@ class MotorValve(PersistentMixin, Drivable):
if not self.motor.home:
self.motor.stop()
return None
return Retry()
return Retry
motvalue = self.motor.read_value()
if motvalue < -360:
self.read_value() # status -> error
@ -180,7 +180,7 @@ class MotorValve(PersistentMixin, Drivable):
# moved by more than 5 deg
state.prev = self.motor.value
self.motor.write_target(-360)
return Retry()
return Retry
if motvalue > 5:
self.status = ERROR, 'closing valve failed (zero not reached)'
return None
@ -216,7 +216,7 @@ class MotorValve(PersistentMixin, Drivable):
self.status = ERROR, 'ref run failed, can not find home switch'
return None
if not self.motor.home:
return Retry()
return Retry
self.motor.write_speed(self.lowspeed)
state.prev = self.motor.read_value()
self.motor.write_target(state.prev - 360)
@ -226,13 +226,13 @@ class MotorValve(PersistentMixin, Drivable):
def ref_released(self, state):
if self.motor.isBusy():
if self.motor.home:
return Retry()
return Retry
elif self.motor.read_home():
if state.count > 0:
state.count -= 1
self.log.info('home switch not released, try again')
self.motor.write_target(self.motor.target)
return Retry()
return Retry
self.status = ERROR, 'ref run failed, can not release home switch'
return None
return self.ref_home
@ -242,7 +242,7 @@ class MotorValve(PersistentMixin, Drivable):
if not self.motor.home:
self.motor.stop()
return None
return Retry()
return Retry
self.motor.set_zero(max(-50, (self.motor.read_value() - state.prev) * 0.5))
self.read_value() # check home button is valid
if abs(self.motor.target - self.motor.value) < 5:

View File

@ -21,18 +21,30 @@
# *****************************************************************************
from frappy.lib.statemachine import StateMachine, Stop, Retry
from frappy.core import Drivable, Parameter
from frappy.datatypes import StatusType, Enum
from frappy.states import StateMachine, Stop, Retry, Finish, Start, HasStates, StatusCode
class LoggerStub:
def info(self, fmt, *args):
print(fmt % args)
def debug(self, fmt, *args):
pass
warning = exception = error = info
handlers = []
def rise(state):
state.step += 1
print('rise', state.step)
if state.init:
state.status = 'rise'
state.level += 1
if state.level > 3:
return turn
return Retry()
return Retry
def turn(state):
@ -42,7 +54,7 @@ def turn(state):
state.direction += 1
if state.direction > 3:
return fall
return Retry()
return Retry
def fall(state):
@ -52,32 +64,35 @@ def fall(state):
state.level -= 1
if state.level < 0:
raise ValueError('crash')
return Retry(0) # retry until crash!
return fall # retry until crash!
def error_handler(state):
state.last_error_name = type(state.last_error).__name__
def finish(state):
return None
class LoggerStub:
def debug(self, fmt, *args):
print(fmt % args)
info = warning = exception = error = debug
handlers = []
class Result:
cleanup_reason = None
def __init__(self):
self.states = []
class DummyThread:
def is_alive(self):
return True
def on_error(self, sm):
self.cleanup_reason = sm.cleanup_reason
def on_transition(self, sm, newstate):
self.states.append(newstate)
def test_fun():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
obj = Result()
s = StateMachine(step=0, status='', transition=obj.on_transition, logger=LoggerStub())
assert s.step == 0
assert s.status == ''
s.cycle() # do nothing
assert s.step == 0
s.start(rise, level=0, direction=0)
s.start(rise, cleanup=obj.on_error, level=0, direction=0)
s.cycle()
for i in range(1, 4):
assert s.status == 'rise'
assert s.step == i
@ -91,56 +106,221 @@ def test_fun():
assert s.direction == i - 4
s.cycle()
s.cycle() # -> crash
assert isinstance(s.last_error, ValueError)
assert str(s.last_error) == 'crash'
assert s.state is None
assert isinstance(obj.cleanup_reason, ValueError)
assert str(obj.cleanup_reason) == 'crash'
assert obj.states == [rise, turn, fall, fall, fall, fall, fall, None]
assert s.statefunc is None
def test_max_chain():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
s.start(fall, level=999+1, direction=0)
assert isinstance(s.last_error, RuntimeError)
assert s.state is None
obj = Result()
s = StateMachine(step=0, status='', transition=obj.on_transition, logger=LoggerStub())
s.start(fall, cleanup=obj.on_error, level=999+1, direction=0)
s.cycle()
assert isinstance(obj.cleanup_reason, RuntimeError)
assert s.statefunc is None
def test_stop():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
s.start(rise, level=0, direction=0)
for _ in range(1, 3):
obj = Result()
s = StateMachine(step=0, status='', transition=obj.on_transition, logger=LoggerStub())
s.start(rise, cleanup=obj.on_error, level=0, direction=0)
for _ in range(3):
s.cycle()
s.stop()
s.cycle()
assert s.last_error is Stop
assert s.state is None
assert isinstance(obj.cleanup_reason, Stop)
assert obj.states == [rise, None]
assert s.statefunc is None
def test_std_error_handling():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
s.start(rise, level=0, direction=0)
def test_error_handling():
obj = Result()
s = StateMachine(step=0, status='', transition=obj.on_transition, logger=LoggerStub())
s.start(rise, cleanup=obj.on_error, level=0, direction=0)
s.cycle()
s.level = None # -> TypeError on next step
s.cycle()
assert s.state is None # default error handler: stop machine
assert isinstance(s.last_error, TypeError)
assert not hasattr(s, 'last_error_name')
def test_default_error_handling():
s = StateMachine(step=0, status='', cleanup=error_handler, threaded=False, logger=LoggerStub())
s.start(rise, level=0, direction=0)
s.cycle()
s.level = None
s.cycle()
assert s.state is None
assert s.last_error_name == 'TypeError'
assert isinstance(s.last_error, TypeError)
assert isinstance(obj.cleanup_reason, TypeError)
assert obj.states == [rise, None]
assert s.statefunc is None
def test_cleanup_on_restart():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
s.start(rise, level=0, direction=0)
def test_on_restart():
obj = Result()
s = StateMachine(step=0, status='', transition=obj.on_transition, logger=LoggerStub())
s.start(rise, cleanup=obj.on_error, level=0, direction=0)
s.cycle()
s.cycle()
s.start(turn)
s.cycle()
assert s.state is turn
assert s.last_error is None
assert isinstance(obj.cleanup_reason, Start)
obj.cleanup_reason = None
s.cycle()
assert s.statefunc is turn
assert obj.cleanup_reason is None
assert obj.states == [rise, None, turn]
def test_finish():
obj = Result()
s = StateMachine(step=0, status='', transition=obj.on_transition, logger=LoggerStub())
s.start(finish, cleanup=obj.on_error, level=0, direction=0)
s.cycle()
s.cycle()
assert obj.states == [finish, None]
assert s.statefunc is None
Status = Enum(
Drivable.Status,
PREPARED=150,
PREPARING=340,
RAMPING=370,
STABILIZING=380,
FINALIZING=390,
)
class DispatcherStub:
# the first update from the poller comes a very short time after the
# initial value from the timestamp. However, in the test below
# the second update happens after the updates dict is cleared
# -> we have to inhibit the 'omit unchanged update' feature
omit_unchanged_within = 0
def __init__(self, updates):
self.updates = updates
def announce_update(self, modulename, pname, pobj):
assert modulename == 'obj'
if pobj.readerror:
self.updates.append((pname, pobj.readerror))
else:
self.updates.append((pname, pobj.value))
class ServerStub:
def __init__(self, updates):
self.dispatcher = DispatcherStub(updates)
class Mod(HasStates, Drivable):
status = Parameter(datatype=StatusType(Status))
_my_time = 0
def artificial_time(self):
return self._my_time
def on_cleanup(self, sm):
return self.cleanup_one
def state_transition(self, sm, newstate):
self.statelist.append(getattr(newstate, '__name__', None))
super().state_transition(sm, newstate)
def state_one(self, sm):
if sm.init:
return Retry
return self.state_two
@StatusCode('PREPARING', 'state 2')
def state_two(self, sm):
return self.state_three
@StatusCode('FINALIZING')
def state_three(self, sm):
if sm.init:
return Retry
return self.final_status('IDLE', 'finished')
@StatusCode('BUSY')
def cleanup_one(self, sm):
if sm.init:
return Retry
print('one 2')
return self.cleanup_two
def cleanup_two(self, sm):
if sm.init:
return Retry
return Finish
def doPoll(self):
super().doPoll()
self._my_time += 1
def create_module():
updates = []
obj = Mod('obj', LoggerStub(), {'.description': ''}, ServerStub(updates))
obj.initModule()
obj.statelist = []
try:
obj._Module__pollThread(obj.polledModules, None)
except TypeError:
pass # None is not callable
updates.clear()
return obj, updates
def test_updates():
obj, updates = create_module()
obj.start_machine(obj.state_one)
for _ in range(10):
obj.doPoll()
assert updates == [
('status', (Status.BUSY, 'state one')), # default: BUSY, function name without '_'
('status', (Status.PREPARING, 'state 2')), # explicitly given
('status', (Status.FINALIZING, 'state three')), # only code given
('status', (Status.IDLE, 'finished')),
]
def test_stop_without_cleanup():
obj, updates = create_module()
obj.start_machine(obj.state_one)
obj.doPoll()
obj.stop_machine()
for _ in range(10):
obj.doPoll()
assert updates == [
('status', (Status.BUSY, 'state one')),
('status', (Status.BUSY, 'stopping')),
('status', (Status.IDLE, 'stopped')),
]
assert obj.statelist == ['state_one', None]
def test_stop_with_cleanup():
obj, updates = create_module()
obj.start_machine(obj.state_one, cleanup=obj.on_cleanup)
obj.doPoll()
obj.stop_machine()
for _ in range(10):
obj.doPoll()
assert updates == [
('status', (Status.BUSY, 'state one')),
('status', (Status.BUSY, 'stopping')),
('status', (Status.BUSY, 'stopping (cleanup one)')),
('status', (Status.IDLE, 'stopped')),
]
assert obj.statelist == ['state_one', 'cleanup_one', 'cleanup_two', None]
def test_all_restart():
obj, updates = create_module()
obj.start_machine(obj.state_one, cleanup=obj.on_cleanup, statelist=[])
obj.doPoll()
obj.start_machine(obj.state_three)
for _ in range(10):
obj.doPoll()
assert updates == [
('status', (Status.BUSY, 'state one')),
('status', (Status.FINALIZING, 'restarting')),
('status', (Status.FINALIZING, 'restarting (cleanup one)')),
('status', (Status.FINALIZING, 'state three')),
('status', (Status.IDLE, 'finished')),
]
assert obj.statelist == ['state_one', 'cleanup_one', 'cleanup_two', None, 'state_three', None]