#!/usr/bin/env python # -*- coding: utf-8 -*- # ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # # ***************************************************************************** """mixin for modules with a statemachine""" from secop.lib.statemachine import StateMachine, Finish, Retry, StateHandler from secop.core import BUSY, IDLE, ERROR, Parameter from secop.errors import ProgrammingError class status_code: """decorator for state methods""" def __init__(self, code, text=None): self.code = code self.text = text def __set_name__(self, owner, name): if not issubclass(owner, HasStates): raise ProgrammingError('when using decorator "status_code", %s must inherit HasStates' % owner.__name__) self.cls = owner self.name = name if 'statusMap' not in owner.__dict__: # we need a copy on each inheritance level owner.statusMap = dict(owner.statusMap) owner.statusMap[name] = self.code, name.replace('_', ' ') if self.text is None else self.text setattr(owner, name, self.func) def __call__(self, func): self.func = func return self class HasStates(StateHandler): status = Parameter() # make sure this is a parameter skip_consecutive_status_changes = False _state_machine = None _next_cycle = 0 statusMap = {} def init_state_machine(self, fullstatus=True, **kwds): self._state_machine = StateMachine( logger=self.log, threaded=False, handler=self, default_delay=1e-3, # small but not zero (avoid infinite loop) **kwds) def initModule(self): super().initModule() self.init_state_machine() def on_error(self, statemachine, exc): """called on error""" error = '%r in %s' % (exc, statemachine.status_string) self.log.error('%s', error) return self.final_status(ERROR, error) def on_transition(self, statemachine, newstate): if not self.skip_consecutive_status_changes: self.set_status_from_state(newstate) def set_status_from_state(self, newstate): name = newstate.__name__ status = self.statusMap.get(name) if status is None: status = BUSY, name.replace('_', ' ') if status != self.status: self.status = status def doPoll(self): super().doPoll() now = self.pollInfo.last_main if now > self._next_cycle: delay = self._state_machine.cycle() if delay is None: self._next_cycle = 0 else: if self.skip_consecutive_status_changes: self.set_status_from_state(self._state_machine.state) self._next_cycle = now + delay def start_state(self, start_state, fast_poll=True, **kwds): self._state_machine.start(start_state, **kwds) if fast_poll is not None: self.setFastPoll(fast_poll) def final_status(self, code=IDLE, text='', fast_poll=False): self.status = code, text if fast_poll is not None: self.setFastPoll(fast_poll) return Finish