diff --git a/frappy/io.py b/frappy/io.py index 307fa97..d2cca96 100644 --- a/frappy/io.py +++ b/frappy/io.py @@ -63,7 +63,7 @@ class HasIO(Module): io = self.ioClass(ioname, srv.log.getChild(ioname), opts, srv) # pylint: disable=not-callable io.callingModule = [] srv.modules[ioname] = io - srv.dispatcher.register_module(io, ioname) + srv.secnode.add_module(io, ioname) self.ioDict[self.uri] = ioname self.io = ioname diff --git a/frappy/logging.py b/frappy/logging.py index 085ec8c..be3bcb2 100644 --- a/frappy/logging.py +++ b/frappy/logging.py @@ -55,6 +55,8 @@ class RemoteLogHandler(mlzlog.Handler): def __init__(self): super().__init__() self.subscriptions = {} # dict[modname] of tuple(mobobj, dict [conn] of level) + # None will be replaced by a callback when one is first installed + self.send_log = None def emit(self, record): """unused""" @@ -62,18 +64,18 @@ class RemoteLogHandler(mlzlog.Handler): def handle(self, record): modname = record.name.split('.')[-1] try: - modobj, subscriptions = self.subscriptions[modname] + subscriptions = self.subscriptions[modname] except KeyError: return for conn, lev in subscriptions.items(): if record.levelno >= lev: - modobj.DISPATCHER.send_log_msg( - conn, modobj.name, LEVEL_NAMES[record.levelno], + self.send_log( # pylint: disable=not-callable + conn, modname, LEVEL_NAMES[record.levelno], record.getMessage()) - def set_conn_level(self, modobj, conn, level): + def set_conn_level(self, modname, conn, level): level = check_level(level) - modobj, subscriptions = self.subscriptions.setdefault(modobj.name, (modobj, {})) + subscriptions = self.subscriptions.setdefault(modname, {}) if level == OFF: subscriptions.pop(conn, None) else: @@ -127,7 +129,7 @@ class HasComlog: if self.comlog and generalConfig.initialized and generalConfig.comlog: self._comLog = mlzlog.Logger(f'COMLOG.{self.name}') self._comLog.handlers[:] = [] - directory = join(logger.logdir, logger.rootname, 'comlog', self.DISPATCHER.name) + directory = join(logger.logdir, logger.rootname, 'comlog', self.secNode.name) self._comLog.addHandler(ComLogfileHandler( directory, self.name, max_days=generalConfig.getint('comlog_days', 7))) return diff --git a/frappy/modulebase.py b/frappy/modulebase.py new file mode 100644 index 0000000..9d25648 --- /dev/null +++ b/frappy/modulebase.py @@ -0,0 +1,858 @@ +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Enrico Faulhaber +# Markus Zolliker +# Alexander Zaft +# +# ***************************************************************************** +"""Defines the base Module class""" + + +import time +import threading +from collections import OrderedDict + +from frappy.datatypes import ArrayOf, BoolType, EnumType, FloatRange, \ + IntRange, StringType, TextType, TupleOf, \ + NoneOr +from frappy.errors import BadValueError, CommunicationFailedError, ConfigError, \ + ProgrammingError, SECoPError, secop_error, RangeError +from frappy.lib import formatException, mkthread, UniqueObject +from frappy.params import Accessible, Command, Parameter, Limit +from frappy.properties import HasProperties, Property +from frappy.logging import RemoteLogHandler + +# TODO: resolve cirular import +# from .interfaces import SECoP_BASE_CLASSES +# WORKAROUND: +SECoP_BASE_CLASSES = ['Readable', 'Writable', 'Drivable', 'Communicator'] + +Done = UniqueObject('Done') +"""a special return value for a read_/write_ method + +indicating that the setter is triggered already""" + +wrapperClasses = {} + + +class HasAccessibles(HasProperties): + """base class of Module + + joining the class's properties, parameters and commands dicts with + those of base classes. + wrap read_*/write_* methods + (so the dispatcher will get notified of changed values) + """ + isWrapped = False + checkedMethods = set() + + @classmethod + def __init_subclass__(cls): # pylint: disable=too-many-branches + super().__init_subclass__() + if cls.isWrapped: + return + # merge accessibles from all sub-classes, treat overrides + # for now, allow to use also the old syntax (parameters/commands dict) + accessibles = OrderedDict() # dict of accessibles + merged_properties = {} # dict of dict of merged properties + new_names = [] # list of names of new accessibles + override_values = {} # bare values overriding a parameter and methods overriding a command + + for base in reversed(cls.__mro__): + for key, value in base.__dict__.items(): + if isinstance(value, Accessible): + value.updateProperties(merged_properties.setdefault(key, {})) + if base == cls and key not in accessibles: + new_names.append(key) + accessibles[key] = value + override_values.pop(key, None) + elif key in accessibles: + override_values[key] = value + for aname, aobj in list(accessibles.items()): + if aname in override_values: + aobj = aobj.copy() + value = override_values[aname] + if value is None: + accessibles.pop(aname) + continue + aobj.merge(merged_properties[aname]) + aobj.override(value) + # replace the bare value by the created accessible + setattr(cls, aname, aobj) + else: + aobj.merge(merged_properties[aname]) + accessibles[aname] = aobj + + # rebuild order: (1) inherited items, (2) items from paramOrder, (3) new accessibles + # move (2) to the end + paramOrder = cls.__dict__.get('paramOrder', ()) + for aname in paramOrder: + if aname in accessibles: + accessibles.move_to_end(aname) + # ignore unknown names + # move (3) to the end + for aname in new_names: + if aname not in paramOrder: + accessibles.move_to_end(aname) + # note: for python < 3.6 the order of inherited items is not ensured between + # declarations within the same class + cls.accessibles = accessibles + + cls.wrappedAttributes = {'isWrapped': True} + # create wrappers for access methods + wrapped_name = '_' + cls.__name__ + for pname, pobj in accessibles.items(): + # wrap of reading/writing funcs + if not isinstance(pobj, Parameter): + # nothing to do for Commands + continue + + rname = 'read_' + pname + rfunc = getattr(cls, rname, None) + # create wrapper + if rfunc: + + def new_rfunc(self, pname=pname, rfunc=rfunc): + with self.accessLock: + try: + value = rfunc(self) + self.log.debug("read_%s returned %r", pname, value) + if value is Done: # TODO: to be removed when all code using Done is updated + return getattr(self, pname) + pobj = self.accessibles[pname] + value = pobj.datatype(value) + except Exception as e: + self.log.debug("read_%s failed with %r", pname, e) + if isinstance(e, SECoPError): + e.raising_methods.append(f'{self.name}.read_{pname}') + self.announceUpdate(pname, err=e) + raise + self.announceUpdate(pname, value, validate=False) + return value + + new_rfunc.poll = getattr(rfunc, 'poll', True) + else: + + def new_rfunc(self, pname=pname): + return getattr(self, pname) + + new_rfunc.poll = False + + new_rfunc.__name__ = rname + new_rfunc.__qualname__ = wrapped_name + '.' + rname + new_rfunc.__module__ = cls.__module__ + cls.wrappedAttributes[rname] = new_rfunc + + cname = 'check_' + pname + for postfix in ('_limits', '_min', '_max'): + limname = pname + postfix + if limname in accessibles: + # find the base class, where the parameter is defined first. + # we have to check all bases, as they may not be treated yet when + # not inheriting from HasAccessibles + base = next(b for b in reversed(cls.__mro__) if limname in b.__dict__) + if cname not in base.__dict__: + # there is no check method yet at this class + # add check function to the class where the limit was defined + setattr(base, cname, lambda self, value, pname=pname: self.checkLimits(value, pname)) + + cfuncs = tuple(filter(None, (b.__dict__.get(cname) for b in cls.__mro__))) + wname = 'write_' + pname + wfunc = getattr(cls, wname, None) + if wfunc or not pobj.readonly: + # allow write method even when parameter is readonly, but internally writable + + def new_wfunc(self, value, pname=pname, wfunc=wfunc, check_funcs=cfuncs): + with self.accessLock: + self.log.debug('validate %r to datatype of %r', value, pname) + validate = self.parameters[pname].datatype.validate + try: + new_value = validate(value) + for c in check_funcs: + if c(self, value): + break + if wfunc: + new_value = wfunc(self, new_value) + self.log.debug('write_%s(%r) returned %r', pname, value, new_value) + if new_value is Done: # TODO: to be removed when all code using Done is updated + return getattr(self, pname) + new_value = value if new_value is None else validate(new_value) + except Exception as e: + if isinstance(e, SECoPError): + e.raising_methods.append(f'{self.name}.write_{pname}') + self.announceUpdate(pname, err=e) + raise + self.announceUpdate(pname, new_value, validate=False) + return new_value + + new_wfunc.__name__ = wname + new_wfunc.__qualname__ = wrapped_name + '.' + wname + new_wfunc.__module__ = cls.__module__ + cls.wrappedAttributes[wname] = new_wfunc + + cls.checkedMethods.update(cls.wrappedAttributes) + + # check for programming errors + for attrname in dir(cls): + prefix, _, pname = attrname.partition('_') + if not pname: + continue + if prefix == 'do': + raise ProgrammingError(f'{cls.__name__!r}: old style command {attrname!r} not supported anymore') + if prefix in ('read', 'write') and attrname not in cls.checkedMethods: + raise ProgrammingError(f'{cls.__name__}.{attrname} defined, but {pname!r} is no parameter') + + try: + # update Status type + cls.Status = cls.status.datatype.members[0]._enum + except AttributeError: + pass + res = {} + # collect info about properties + for pn, pv in cls.propertyDict.items(): + if pv.settable: + res[pn] = pv + # collect info about parameters and their properties + for param, pobj in cls.accessibles.items(): + res[param] = {} + for pn, pv in pobj.getProperties().items(): + if pv.settable: + res[param][pn] = pv + cls.configurables = res + + def __new__(cls, *args, **kwds): + wrapper_class = wrapperClasses.get(cls) + if wrapper_class is None: + wrapper_class = type('_' + cls.__name__, (cls,), cls.wrappedAttributes) + wrapperClasses[cls] = wrapper_class + return super().__new__(wrapper_class) + + +class Feature(HasAccessibles): + """all things belonging to a small, predefined functionality influencing the working of a module + + a mixin with Feature as a direct base class is recognized as a SECoP feature + and reported in the module property 'features' + """ + + +class PollInfo: + def __init__(self, pollinterval, trigger_event): + self.interval = pollinterval + self.last_main = 0 + self.last_slow = 0 + self.pending_errors = set() + self.polled_parameters = [] + self.fast_flag = False + self.trigger_event = trigger_event + + def trigger(self, immediate=False): + """trigger a recalculation of poll due times + + :param immediate: when True, doPoll should be called as soon as possible + """ + if immediate: + self.last_main = 0 + self.trigger_event.set() + + def update_interval(self, pollinterval): + if not self.fast_flag: + self.interval = pollinterval + self.trigger() + + +class Module(HasAccessibles): + """basic module + + all SECoP modules derive from this. + + :param name: the modules name + :param logger: a logger instance + :param cfgdict: the dict from this modules section in the config file + :param srv: the server instance + + Notes: + + - the programmer normally should not need to reimplement :meth:`__init__` + - within modules, parameters should only be addressed as ``self.``, + i.e. ``self.value``, ``self.target`` etc... + + - these are accessing the cached version. + - they can also be written to, generating an async update + + - if you want to 'update from the hardware', call ``self.read_()`` instead + + - the return value of this method will be used as the new cached value and + be an async update sent automatically. + + - if you want to 'update the hardware' call ``self.write_()``. + + - The return value of this method will also update the cache. + + """ + # static properties, definitions in derived classes should overwrite earlier ones. + # note: properties don't change after startup and are usually filled + # with data from a cfg file... + # note: only the properties predefined here are allowed to be set in the cfg file + export = Property('flag if this module is to be exported', BoolType(), default=True, export=False) + group = Property('optional group the module belongs to', StringType(), default='', extname='group') + description = Property('description of the module', TextType(), extname='description', mandatory=True) + meaning = Property('optional meaning indicator', TupleOf(StringType(), IntRange(0, 50)), + default=('', 0), extname='meaning') + visibility = Property('optional visibility hint', EnumType('visibility', user=1, advanced=2, expert=3), + default='user', extname='visibility') + implementation = Property('internal name of the implementation class of the module', StringType(), + extname='implementation') + interface_classes = Property('offical highest interface-class of the module', ArrayOf(StringType()), + extname='interface_classes') + features = Property('list of features', ArrayOf(StringType()), extname='features') + pollinterval = Property('poll interval for parameters handled by doPoll', FloatRange(0.1, 120), default=5) + slowinterval = Property('poll interval for other parameters', FloatRange(0.1, 120), default=15) + omit_unchanged_within = Property('default for minimum time between updates of unchanged values', + NoneOr(FloatRange(0)), export=False, default=None) + enablePoll = True + + pollInfo = None + triggerPoll = None # trigger event for polls. used on io modules and modules without io + + def __init__(self, name, logger, cfgdict, srv): + # remember the secnode for interacting with other modules and the + # server + self.secNode = srv.secnode + self.log = logger + self.name = name + self.valueCallbacks = {} + self.errorCallbacks = {} + self.earlyInitDone = False + self.initModuleDone = False + self.startModuleDone = False + self.remoteLogHandler = None + self.accessLock = threading.RLock() # for read_* / write_* methods + self.updateLock = threading.RLock() # for announceUpdate + self.polledModules = [] # modules polled by thread started in self.startModules + self.attachedModules = {} + self.errors = [] + self._isinitialized = False + self.updateCallback = srv.dispatcher.announce_update + + # handle module properties + # 1) make local copies of properties + super().__init__() + + # conversion from exported names to internal attribute names + self.accessiblename2attr = {} + self.writeDict = {} # values of parameters to be written + # properties, parameters and commands are auto-merged upon subclassing + self.parameters = {} + self.commands = {} + + # 2) check and apply properties specified in cfgdict as + # ' = ' + # pylint: disable=consider-using-dict-items + for key in self.propertyDict: + value = cfgdict.pop(key, None) + if value is not None: + try: + if isinstance(value, dict): + self.setProperty(key, value['value']) + else: + self.setProperty(key, value) + except BadValueError: + self.errors.append(f'{key}: value {value!r} does not match {self.propertyDict[key].datatype!r}!') + + # 3) set automatic properties + mycls, = self.__class__.__bases__ # skip the wrapper class + myclassname = f'{mycls.__module__}.{mycls.__name__}' + self.implementation = myclassname + + # list of only the 'highest' secop module class + self.interface_classes = [ + b.__name__ for b in mycls.__mro__ if b.__name__ in SECoP_BASE_CLASSES][:1] + + # handle Features + self.features = [b.__name__ for b in mycls.__mro__ if Feature in b.__bases__] + + # handle accessibles + # 1) make local copies of parameter objects + # they need to be individual per instance since we use them also + # to cache the current value + qualifiers... + # do not re-use self.accessibles as this is the same for all instances + accessibles = self.accessibles + self.accessibles = {} + for aname, aobj in accessibles.items(): + # make a copy of the Parameter/Command object + aobj = aobj.copy() + acfg = cfgdict.pop(aname, None) + self._add_accessible(aname, aobj, cfg=acfg) + + # 3) complain about names not found as accessible or property names + if cfgdict: + self.errors.append( + f"{', '.join(cfgdict.keys())} does not exist (use one of" + f" {', '.join(list(self.accessibles) + list(self.propertyDict))})") + + # 5) ensure consistency of all accessibles added here + for aobj in self.accessibles.values(): + aobj.finish(self) + + # Modify units AFTER applying the cfgdict + mainvalue = self.parameters.get('value') + if mainvalue: + mainunit = mainvalue.datatype.unit + if mainunit: + self.applyMainUnit(mainunit) + + # 6) check complete configuration of * properties + if not self.errors: + try: + self.checkProperties() + except ConfigError as e: + self.errors.append(str(e)) + for aname, aobj in self.accessibles.items(): + try: + aobj.checkProperties() + except (ConfigError, ProgrammingError) as e: + self.errors.append(f'{aname}: {e}') + if self.errors: + raise ConfigError(self.errors) + + # helper cfg-editor + def __iter__(self): + return self.accessibles.__iter__() + + def __getitem__(self, item): + return self.accessibles.__getitem__(item) + + def applyMainUnit(self, mainunit): + """replace $ in units of parameters by mainunit""" + for pobj in self.parameters.values(): + pobj.datatype.set_main_unit(mainunit) + + def _add_accessible(self, name, accessible, cfg=None): + if self.startModuleDone: + raise ProgrammingError('Accessibles can only be added before startModule()!') + if not self.export: # do not export parameters of a module not exported + accessible.export = False + self.accessibles[name] = accessible + if accessible.export: + self.accessiblename2attr[accessible.export] = name + if isinstance(accessible, Parameter): + self.parameters[name] = accessible + if isinstance(accessible, Command): + self.commands[name] = accessible + if cfg: + try: + for propname, propvalue in cfg.items(): + accessible.setProperty(propname, propvalue) + except KeyError: + self.errors.append(f"'{name}' has no property '{propname}'") + except BadValueError as e: + self.errors.append(f'{name}.{propname}: {str(e)}') + if isinstance(accessible, Parameter): + self._handle_writes(name, accessible) + + def _handle_writes(self, pname, pobj): + """ register value for writing, if given + apply default when no value is given (in cfg or as Parameter argument) + or complain, when cfg is needed + """ + self.valueCallbacks[pname] = [] + self.errorCallbacks[pname] = [] + if isinstance(pobj, Limit): + basepname = pname.rpartition('_')[0] + baseparam = self.parameters.get(basepname) + if not baseparam: + self.errors.append(f'limit {pname!r} is given, but not {basepname!r}') + return + if baseparam.datatype is None: + return # an error will be reported on baseparam + pobj.set_datatype(baseparam.datatype) + if not pobj.hasDatatype(): + self.errors.append(f'{pname} needs a datatype') + return + if pobj.value is None: + if pobj.needscfg: + self.errors.append(f'{pname!r} has no default value and was not given in config!') + if pobj.default is None: + # we do not want to call the setter for this parameter for now, + # this should happen on the first read + pobj.readerror = ConfigError(f'parameter {pname!r} not initialized') + # above error will be triggered on activate after startup, + # when not all hardware parameters are read because of startup timeout + pobj.default = pobj.datatype.default + pobj.value = pobj.default + else: + # value given explicitly, either by cfg or as Parameter argument + pobj.given = True # for PersistentMixin + if hasattr(self, 'write_' + pname): + self.writeDict[pname] = pobj.value + if pobj.default is None: + pobj.default = pobj.value + # this checks again for datatype and sets the timestamp + setattr(self, pname, pobj.value) + + def announceUpdate(self, pname, value=None, err=None, timestamp=None, validate=True): + """announce a changed value or readerror + + :param pname: parameter name + :param value: new value or None in case of error + :param err: None or an exception + :param timestamp: a timestamp or None for taking current time + :param validate: True: convert to datatype, in case of error store in readerror + :return: + + when err=None and validate=False, the value must already be converted to the datatype + """ + + with self.updateLock: + pobj = self.parameters[pname] + timestamp = timestamp or time.time() + if not err: + try: + if validate: + value = pobj.datatype(value) + except Exception as e: + err = e + else: + changed = pobj.value != value + # store the value even in case of error + pobj.value = value + if err: + if secop_error(err) == pobj.readerror: + err.report_error = False + return # no updates for repeated errors + err = secop_error(err) + elif not changed and timestamp < (pobj.timestamp or 0) + pobj.omit_unchanged_within: + # no change within short time -> omit + return + pobj.timestamp = timestamp or time.time() + if err: + callbacks = self.errorCallbacks + pobj.readerror = arg = err + else: + callbacks = self.valueCallbacks + arg = value + pobj.readerror = None + if pobj.export: + self.updateCallback(self.name, pname, pobj) + cblist = callbacks[pname] + for cb in cblist: + try: + cb(arg) + except Exception: + # print(formatExtendedTraceback()) + pass + + def registerCallbacks(self, modobj, autoupdate=()): + """register callbacks to another module + + - whenever a self. changes: + .update_ is called with the new value as argument. + If this method raises an exception, . gets into an error state. + If the method does not exist and is in autoupdate, + . is updated to self. + - whenever . gets into an error state: + .error_update_ is called with the exception as argument. + If this method raises an error, . gets into an error state. + If this method does not exist, and is in autoupdate, + . gets into the same error state as self. + """ + for pname in self.parameters: + errfunc = getattr(modobj, 'error_update_' + pname, None) + if errfunc: + def errcb(err, p=pname, efunc=errfunc): + try: + efunc(err) + except Exception as e: + modobj.announceUpdate(p, err=e) + self.errorCallbacks[pname].append(errcb) + else: + def errcb(err, p=pname): + modobj.announceUpdate(p, err=err) + if pname in autoupdate: + self.errorCallbacks[pname].append(errcb) + + updfunc = getattr(modobj, 'update_' + pname, None) + if updfunc: + def cb(value, ufunc=updfunc, efunc=errcb): + try: + ufunc(value) + except Exception as e: + efunc(e) + self.valueCallbacks[pname].append(cb) + elif pname in autoupdate: + def cb(value, p=pname): + modobj.announceUpdate(p, value) + self.valueCallbacks[pname].append(cb) + + def isBusy(self, status=None): + """helper function for treating substates of BUSY correctly""" + # defined even for non drivable (used for dynamic polling) + return False + + def earlyInit(self): + """initialise module with stuff to be done before all modules are created""" + self.earlyInitDone = True + + def initModule(self): + """initialise module with stuff to be done after all modules are created""" + self.initModuleDone = True + if self.enablePoll or self.writeDict: + # enablePoll == False: we still need the poll thread for writing values from writeDict + if hasattr(self, 'io'): + self.io.polledModules.append(self) + else: + self.triggerPoll = threading.Event() + self.polledModules.append(self) + + def startModule(self, start_events): + """runs after init of all modules + + when a thread is started, a trigger function may signal that it + has finished its initial work + start_events.get_trigger() creates such a trigger and + registers it in the server for waiting + defaults to 30 seconds + """ + # we do not need self.errors any longer. should we delete it? + # del self.errors + if self.polledModules: + mkthread(self.__pollThread, self.polledModules, start_events.get_trigger()) + self.startModuleDone = True + + def initialReads(self): + """initial reads to be done + + override to read initial values from HW, when it is not desired + to poll them afterwards + + called from the poll thread, after writeInitParams but before + all parameters are polled once + """ + + def shutdownModule(self): + """called when the sever shuts down + + any cleanup-work should be performed here, like closing threads and + saving data. + """ + + def doPoll(self): + """polls important parameters like value and status + + all other parameters are polled automatically + """ + + def setFastPoll(self, flag, fast_interval=0.25): + """change poll interval + + :param flag: enable/disable fast poll mode + :param fast_interval: fast poll interval + """ + if self.pollInfo: + self.pollInfo.fast_flag = flag + self.pollInfo.interval = fast_interval if flag else self.pollinterval + self.pollInfo.trigger() + + def callPollFunc(self, rfunc, raise_com_failed=False): + """call read method with proper error handling""" + try: + rfunc() + if rfunc.__name__ in self.pollInfo.pending_errors: + self.log.info('%s: o.k.', rfunc.__name__) + self.pollInfo.pending_errors.discard(rfunc.__name__) + except Exception as e: + if getattr(e, 'report_error', True): + name = rfunc.__name__ + self.pollInfo.pending_errors.add(name) # trigger o.k. message after error is resolved + if isinstance(e, SECoPError): + e.raising_methods.append(name) + if e.silent: + self.log.debug('%s', e.format(False)) + else: + self.log.error('%s', e.format(False)) + if raise_com_failed and isinstance(e, CommunicationFailedError): + raise + else: + # not a SECoPError: this is proabably a programming error + # we want to log the traceback + self.log.error('%s', formatException()) + + def __pollThread(self, modules, started_callback): + """poll thread body + + :param modules: list of modules to be handled by this thread + :param started_callback: to be called after all polls are done once + + before polling, parameters which need hardware initialisation are written + """ + polled_modules = [m for m in modules if m.enablePoll] + if hasattr(self, 'registerReconnectCallback'): + # self is a communicator supporting reconnections + def trigger_all(trg=self.triggerPoll, polled_modules=polled_modules): + for m in polled_modules: + m.pollInfo.last_main = 0 + m.pollInfo.last_slow = 0 + trg.set() + self.registerReconnectCallback('trigger_polls', trigger_all) + + # collect all read functions + for mobj in polled_modules: + pinfo = mobj.pollInfo = PollInfo(mobj.pollinterval, self.triggerPoll) + # trigger a poll interval change when self.pollinterval changes. + if 'pollinterval' in mobj.valueCallbacks: + mobj.valueCallbacks['pollinterval'].append(pinfo.update_interval) + + for pname, pobj in mobj.parameters.items(): + rfunc = getattr(mobj, 'read_' + pname) + if rfunc.poll: + pinfo.polled_parameters.append((mobj, rfunc, pobj)) + while True: + try: + for mobj in modules: + # TODO when needed: here we might add a call to a method :meth:`beforeWriteInit` + mobj.writeInitParams() + mobj.initialReads() + # call all read functions a first time + for m in polled_modules: + for mobj, rfunc, _ in m.pollInfo.polled_parameters: + mobj.callPollFunc(rfunc, raise_com_failed=True) + # TODO when needed: here we might add calls to a method :meth:`afterInitPolls` + break + except CommunicationFailedError as e: + # when communication failed, probably all parameters and may be more modules are affected. + # as this would take a lot of time (summed up timeouts), we do not continue + # trying and let the server accept connections, further polls might success later + if started_callback: + self.log.error('communication failure on startup: %s', e) + started_callback() + started_callback = None + self.triggerPoll.wait(0.1) # wait for reconnection or max 10 sec. + break + if started_callback: + started_callback() + if not polled_modules: # no polls needed - exit thread + return + to_poll = () + while True: + now = time.time() + wait_time = 999 + for mobj in modules: + pinfo = mobj.pollInfo + wait_time = min(pinfo.last_main + pinfo.interval - now, wait_time, + pinfo.last_slow + mobj.slowinterval - now) + if wait_time > 0 and not to_poll: + # nothing to do + self.triggerPoll.wait(wait_time) + self.triggerPoll.clear() + continue + # call doPoll of all modules where due + for mobj in modules: + pinfo = mobj.pollInfo + if now > pinfo.last_main + pinfo.interval: + try: + pinfo.last_main = (now // pinfo.interval) * pinfo.interval + except ZeroDivisionError: + pinfo.last_main = now + mobj.callPollFunc(mobj.doPoll) + now = time.time() + # find ONE due slow poll and call it + loop = True + while loop: # loops max. 2 times, when to_poll is at end + for mobj, rfunc, pobj in to_poll: + if now > pobj.timestamp + mobj.slowinterval * 0.5: + mobj.callPollFunc(rfunc) + loop = False # one poll done + break + else: + to_poll = [] + # collect due slow polls + for mobj in modules: + pinfo = mobj.pollInfo + if now > pinfo.last_slow + mobj.slowinterval: + to_poll.extend(pinfo.polled_parameters) + pinfo.last_slow = (now // mobj.slowinterval) * mobj.slowinterval + if to_poll: + to_poll = iter(to_poll) + else: + loop = False # no slow polls ready + + def writeInitParams(self): + """write values for parameters with configured values + + - does proper error handling + + called at the beginning of the poller thread and for writing persistent values + """ + for pname in list(self.writeDict): + value = self.writeDict.pop(pname, Done) + # in the mean time, a poller or handler might already have done it + if value is not Done: + wfunc = getattr(self, 'write_' + pname, None) + if wfunc is None: + setattr(self, pname, value) + else: + try: + self.log.debug('initialize parameter %s', pname) + wfunc(value) + except SECoPError as e: + if e.silent: + self.log.debug('%s: %s', pname, str(e)) + else: + self.log.error('%s: %s', pname, str(e)) + except Exception: + self.log.error(formatException()) + + def setRemoteLogging(self, conn, level, send_log): + if self.remoteLogHandler is None: + for handler in self.log.handlers: + if isinstance(handler, RemoteLogHandler): + handler.send_log = send_log + self.remoteLogHandler = handler + break + else: + raise ValueError('remote handler not found') + self.remoteLogHandler.set_conn_level(self.name, conn, level) + + def checkLimits(self, value, pname='target'): + """check for limits + + :param value: the value to be checked for _min <= value <= _max + :param pname: parameter name, default is 'target' + + raises RangeError in case the value is not valid + + This method is called automatically and needs therefore rarely to be + called by the programmer. It might be used in a check_ method, + when no automatic super call is desired. + """ + try: + min_, max_ = getattr(self, pname + '_limits') + if not min_ <= value <= max_: + raise RangeError(f'{pname} outside {pname}_limits') + return + except AttributeError: + pass + min_ = getattr(self, pname + '_min', float('-inf')) + max_ = getattr(self, pname + '_max', float('inf')) + if min_ > max_: + raise RangeError(f'invalid limits: {pname}_min > {pname}_max') + if value < min_: + raise RangeError(f'{pname} below {pname}_min') + if value > max_: + raise RangeError(f'{pname} above {pname}_max') diff --git a/frappy/modules.py b/frappy/modules.py index fc57d1d..850b06f 100644 --- a/frappy/modules.py +++ b/frappy/modules.py @@ -940,8 +940,12 @@ class Attached(Property): def __get__(self, obj, owner): if obj is None: return self - if self.name not in obj.attachedModules: - modobj = obj.DISPATCHER.get_module(super().__get__(obj, owner)) + modobj = obj.attachedModules.get(self.name) + if not modobj: + modulename = super().__get__(obj, owner) + if not modulename: + return None # happens when mandatory=False and modulename is not given + modobj = obj.secNode.get_module(modulename) if not isinstance(modobj, self.basecls): raise ConfigError(f'attached module {self.name}={modobj.name!r} '\ f'must inherit from {self.basecls.__qualname__!r}') diff --git a/frappy/persistent.py b/frappy/persistent.py index 03e2499..3477801 100644 --- a/frappy/persistent.py +++ b/frappy/persistent.py @@ -78,7 +78,7 @@ class PersistentMixin(Module): super().__init__(name, logger, cfgdict, srv) persistentdir = os.path.join(generalConfig.logdir, 'persistent') os.makedirs(persistentdir, exist_ok=True) - self.persistentFile = os.path.join(persistentdir, f'{self.DISPATCHER.equipment_id}.{self.name}.json') + self.persistentFile = os.path.join(persistentdir, f'{self.secNode.equipment_id}.{self.name}.json') self.initData = {} # "factory" settings loaded = self.loadPersistentData() for pname in self.parameters: diff --git a/frappy/protocol/dispatcher.py b/frappy/protocol/dispatcher.py index 1de1113..85f7449 100644 --- a/frappy/protocol/dispatcher.py +++ b/frappy/protocol/dispatcher.py @@ -18,6 +18,7 @@ # Module authors: # Enrico Faulhaber # Markus Zolliker +# Alexander Zaft # # ***************************************************************************** """Dispatcher for SECoP Messages @@ -29,28 +30,18 @@ Interface to the service offering part: on the connectionobj or on activated connections - 'add_connection(connectionobj)' registers new connection - 'remove_connection(connectionobj)' removes now longer functional connection - -Interface to the modules: - - add_module(modulename, moduleobj, export=True) registers a new module under the - given name, may also register it for exporting (making accessible) - - get_module(modulename) returns the requested module or None - - remove_module(modulename_or_obj): removes the module (during shutdown) - """ import threading -import traceback -from collections import OrderedDict from time import time as currenttime from frappy.errors import NoSuchCommandError, NoSuchModuleError, \ - NoSuchParameterError, ProtocolError, ReadOnlyError, ConfigError + NoSuchParameterError, ProtocolError, ReadOnlyError from frappy.params import Parameter from frappy.protocol.messages import COMMANDREPLY, DESCRIPTIONREPLY, \ DISABLEEVENTSREPLY, ENABLEEVENTSREPLY, ERRORPREFIX, EVENTREPLY, \ - HEARTBEATREPLY, IDENTREPLY, IDENTREQUEST, READREPLY, WRITEREPLY, \ - LOGGING_REPLY, LOG_EVENT -from frappy.lib import get_class + HEARTBEATREPLY, IDENTREPLY, IDENTREQUEST, LOG_EVENT, LOGGING_REPLY, \ + READREPLY, WRITEREPLY def make_update(modulename, pobj): @@ -71,10 +62,7 @@ class Dispatcher: self.nodeprops[k] = options.pop(k) self.log = logger - # map ALL modulename -> moduleobj - self._modules = {} - # list of EXPORTED modules - self._export = [] + self.secnode = srv.secnode # list all connections self._connections = [] # active (i.e. broadcast-receiving) connections @@ -88,11 +76,6 @@ class Dispatcher: self.shutdown = srv.shutdown # handle to server self.srv = srv - # set of modules that failed creation - self.failed_modules = set() - # list of errors that occured during initialization - self.errors = [] - self.traceback_counter = 0 def broadcast_event(self, msg, reallyall=False): """broadcasts a msg to all active connections @@ -148,163 +131,8 @@ class Dispatcher: self._connections.remove(conn) self.reset_connection(conn) - def register_module(self, moduleobj, modulename, export=True): - self.log.debug('registering module %r as %s (export=%r)', - moduleobj, modulename, export) - self._modules[modulename] = moduleobj - if export: - self._export.append(modulename) - - def get_module(self, modulename): - """ Returns a fully initialized module. Or None, if something went - wrong during instatiating/initializing the module.""" - modobj = self.get_module_instance(modulename) - if modobj is None: - return None - if modobj._isinitialized: - return modobj - - # also call earlyInit on the modules - self.log.debug('initializing module %r', modulename) - try: - modobj.earlyInit() - if not modobj.earlyInitDone: - self.errors.append(f'{modobj.earlyInit.__qualname__} was not called, probably missing super call') - modobj.initModule() - if not modobj.initModuleDone: - self.errors.append(f'{modobj.initModule.__qualname__} was not called, probably missing super call') - except Exception as e: - if self.traceback_counter == 0: - self.log.exception(traceback.format_exc()) - self.traceback_counter += 1 - self.errors.append(f'error initializing {modulename}: {e!r}') - modobj._isinitialized = True - self.log.debug('initialized module %r', modulename) - return modobj - - def get_module_instance(self, modulename): - """ Returns the module in its current initialization state or creates a - new uninitialized modle to return. - - When creating a new module, srv.module_config is accessed to get the - modules configuration. - """ - if modulename in self._modules: - return self._modules[modulename] - if modulename in list(self._modules.values()): - # it's actually already the module object - return modulename - # create module from srv.module_cfg, store and return - self.log.debug('attempting to create module %r', modulename) - - opts = self.srv.module_cfg.get(modulename, None) - if opts is None: - raise NoSuchModuleError(f'Module {modulename!r} does not exist on this SEC-Node!') - pymodule = None - try: # pylint: disable=no-else-return - classname = opts.pop('cls') - if isinstance(classname, str): - pymodule = classname.rpartition('.')[0] - if pymodule in self.failed_modules: - # creation has failed already once, do not try again - return None - cls = get_class(classname) - else: - pymodule = classname.__module__ - if pymodule in self.failed_modules: - # creation has failed already once, do not try again - return None - cls = classname - except Exception as e: - if str(e) == 'no such class': - self.errors.append(f'{classname} not found') - else: - self.failed_modules.add(pymodule) - if self.traceback_counter == 0: - self.log.exception(traceback.format_exc()) - self.traceback_counter += 1 - self.errors.append(f'error importing {classname}') - return None - else: - try: - modobj = cls(modulename, self.log.getChild(modulename), opts, self.srv) - except ConfigError as e: - self.errors.append(f'error creating module {modulename}:') - for errtxt in e.args[0] if isinstance(e.args[0], list) else [e.args[0]]: - self.errors.append(' ' + errtxt) - modobj = None - except Exception as e: - if self.traceback_counter == 0: - self.log.exception(traceback.format_exc()) - self.traceback_counter += 1 - self.errors.append(f'error creating {modulename}') - modobj = None - if modobj: - self.register_module(modobj, modulename, modobj.export) - self.srv.modules[modulename] = modobj # IS HERE THE CORRECT PLACE? - return modobj - - def remove_module(self, modulename_or_obj): - moduleobj = self.get_module(modulename_or_obj) - modulename = moduleobj.name - if modulename in self._export: - self._export.remove(modulename) - self._modules.pop(modulename) - self._subscriptions.pop(modulename, None) - for k in [kk for kk in self._subscriptions if kk.startswith(f'{modulename}:')]: - self._subscriptions.pop(k, None) - - def list_module_names(self): - # return a copy of our list - return self._export[:] - - def export_accessibles(self, modulename): - self.log.debug('export_accessibles(%r)', modulename) - if modulename in self._export: - # omit export=False params! - res = OrderedDict() - for aobj in self.get_module(modulename).accessibles.values(): - if aobj.export: - res[aobj.export] = aobj.for_export() - self.log.debug('list accessibles for module %s -> %r', - modulename, res) - return res - self.log.debug('-> module is not to be exported!') - return OrderedDict() - - def get_descriptive_data(self, specifier): - """returns a python object which upon serialisation results in the descriptive data""" - specifier = specifier or '' - modules = {} - result = {'modules': modules} - for modulename in self._export: - module = self.get_module(modulename) - if not module.export: - continue - # some of these need rework ! - mod_desc = {'accessibles': self.export_accessibles(modulename)} - mod_desc.update(module.exportProperties()) - mod_desc.pop('export', False) - modules[modulename] = mod_desc - modname, _, pname = specifier.partition(':') - if modname in modules: # extension to SECoP standard: description of a single module - result = modules[modname] - if pname in result['accessibles']: # extension to SECoP standard: description of a single accessible - # command is also accepted - result = result['accessibles'][pname] - elif pname: - raise NoSuchParameterError(f'Module {modname!r} has no parameter {pname!r}') - elif not modname or modname == '.': - result['equipment_id'] = self.equipment_id - result['firmware'] = 'FRAPPY - The Python Framework for SECoP' - result['version'] = '2021.02' - result.update(self.nodeprops) - else: - raise NoSuchModuleError(f'Module {modname!r} does not exist') - return result - def _execute_command(self, modulename, exportedname, argument=None): - moduleobj = self.get_module(modulename) + moduleobj = self.secnode.get_module(modulename) if moduleobj is None: raise NoSuchModuleError(f'Module {modulename!r} does not exist') @@ -323,7 +151,7 @@ class Dispatcher: return result, {'t': currenttime()} def _setParameterValue(self, modulename, exportedname, value): - moduleobj = self.get_module(modulename) + moduleobj = self.secnode.get_module(modulename) if moduleobj is None: raise NoSuchModuleError(f'Module {modulename!r} does not exist') @@ -344,7 +172,7 @@ class Dispatcher: return pobj.export_value(), {'t': pobj.timestamp} if pobj.timestamp else {} def _getParameterValue(self, modulename, exportedname): - moduleobj = self.get_module(modulename) + moduleobj = self.secnode.get_module(modulename) if moduleobj is None: raise NoSuchModuleError(f'Module {modulename!r} does not exist') @@ -401,7 +229,7 @@ class Dispatcher: return (IDENTREPLY, None, None) def handle_describe(self, conn, specifier, data): - return (DESCRIPTIONREPLY, specifier or '.', self.get_descriptive_data(specifier)) + return (DESCRIPTIONREPLY, specifier or '.', self.secnode.get_descriptive_data(specifier)) def handle_read(self, conn, specifier, data): if data: @@ -440,9 +268,9 @@ class Dispatcher: modulename, exportedname = specifier, None if ':' in specifier: modulename, exportedname = specifier.split(':', 1) - if modulename not in self._export: + if modulename not in self.secnode.export: raise NoSuchModuleError(f'Module {modulename!r} does not exist') - moduleobj = self.get_module(modulename) + moduleobj = self.secnode.get_module(modulename) if exportedname is not None: pname = moduleobj.accessiblename2attr.get(exportedname, True) if pname and pname not in moduleobj.accessibles: @@ -456,12 +284,12 @@ class Dispatcher: else: # activate all modules self._active_connections.add(conn) - modules = [(m, None) for m in self._export] + modules = [(m, None) for m in self.secnode.export] # send updates for all subscribed values. # note: The initial poll already happend before the server is active for modulename, pname in modules: - moduleobj = self._modules.get(modulename, None) + moduleobj = self.secnode.modules.get(modulename, None) if pname: conn.send_reply(make_update(modulename, moduleobj.parameters[pname])) continue @@ -485,16 +313,16 @@ class Dispatcher: conn.send_reply((LOG_EVENT, f'{modname}:{level}', msg)) def set_all_log_levels(self, conn, level): - for modobj in self._modules.values(): - modobj.setRemoteLogging(conn, level) + for modobj in self.secnode.modules.values(): + modobj.setRemoteLogging(conn, level, self.send_log_msg) def handle_logging(self, conn, specifier, level): if specifier == '#': self.log.handlers[1].setLevel(int(level)) return LOGGING_REPLY, specifier, level if specifier and specifier != '.': - modobj = self._modules[specifier] - modobj.setRemoteLogging(conn, level) + modobj = self.secnode.modules[specifier] + modobj.setRemoteLogging(conn, level, self.send_log_msg) else: self.set_all_log_levels(conn, level) return LOGGING_REPLY, specifier, level diff --git a/frappy/secnode.py b/frappy/secnode.py new file mode 100644 index 0000000..6b7b91a --- /dev/null +++ b/frappy/secnode.py @@ -0,0 +1,281 @@ +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Alexander Zaft +# +# ***************************************************************************** + +import traceback +from collections import OrderedDict + +from frappy.dynamic import Pinata +from frappy.errors import ConfigError, NoSuchModuleError, NoSuchParameterError +from frappy.lib import get_class + + +class SecNode: + """Managing the modules. + + Interface to the modules: + - add_module(module, modulename) + - get_module(modulename) returns the requested module or None if there is + no suitable configuration on the server + """ + def __init__(self, name, logger, options, srv): + self.equipment_id = options.pop('equipment_id', name) + self.nodeprops = {} + for k in list(options): + self.nodeprops[k] = options.pop(k) + # map ALL modulename -> moduleobj + self.modules = {} + # list of EXPORTED modules + self.export = [] + self.log = logger + self.srv = srv + # set of modules that failed creation + self.failed_modules = set() + # list of errors that occured during initialization + self.errors = [] + self.traceback_counter = 0 + self.name = name + + def get_module(self, modulename): + """ Returns a fully initialized module. Or None, if something went + wrong during instatiating/initializing the module.""" + modobj = self.get_module_instance(modulename) + if modobj is None: + return None + if modobj._isinitialized: + return modobj + + # also call earlyInit on the modules + self.log.debug('initializing module %r', modulename) + try: + modobj.earlyInit() + if not modobj.earlyInitDone: + self.errors.append(f'{modobj.earlyInit.__qualname__} was not ' + f'called, probably missing super call') + modobj.initModule() + if not modobj.initModuleDone: + self.errors.append(f'{modobj.initModule.__qualname__} was not ' + f'called, probably missing super call') + except Exception as e: + if self.traceback_counter == 0: + self.log.exception(traceback.format_exc()) + self.traceback_counter += 1 + self.errors.append(f'error initializing {modulename}: {e!r}') + modobj._isinitialized = True + self.log.debug('initialized module %r', modulename) + return modobj + + def get_module_instance(self, modulename): + """ Returns the module in its current initialization state or creates a + new uninitialized modle to return. + + When creating a new module, srv.module_config is accessed to get the + modules configuration. + """ + if modulename in self.modules: + return self.modules[modulename] + if modulename in list(self.modules.values()): + # it's actually already the module object + return modulename + # create module from srv.module_cfg, store and return + self.log.debug('attempting to create module %r', modulename) + + opts = self.srv.module_cfg.get(modulename, None) + if opts is None: + raise NoSuchModuleError(f'Module {modulename!r} does not exist on ' + f'this SEC-Node!') + pymodule = None + try: # pylint: disable=no-else-return + classname = opts.pop('cls') + if isinstance(classname, str): + pymodule = classname.rpartition('.')[0] + if pymodule in self.failed_modules: + # creation has failed already once, do not try again + return None + cls = get_class(classname) + else: + pymodule = classname.__module__ + if pymodule in self.failed_modules: + # creation has failed already once, do not try again + return None + cls = classname + except Exception as e: + if str(e) == 'no such class': + self.errors.append(f'{classname} not found') + else: + self.failed_modules.add(pymodule) + if self.traceback_counter == 0: + self.log.exception(traceback.format_exc()) + self.traceback_counter += 1 + self.errors.append(f'error importing {classname}') + return None + else: + try: + modobj = cls(modulename, self.log.parent.getChild(modulename), + opts, self.srv) + except ConfigError as e: + self.errors.append(f'error creating module {modulename}:') + for errtxt in e.args[0] if isinstance(e.args[0], list) else [e.args[0]]: + self.errors.append(' ' + errtxt) + modobj = None + except Exception as e: + if self.traceback_counter == 0: + self.log.exception(traceback.format_exc()) + self.traceback_counter += 1 + self.errors.append(f'error creating {modulename}') + modobj = None + if modobj: + self.add_module(modobj, modulename) + return modobj + + def create_modules(self): + self.modules = OrderedDict() + + # create and initialize modules + todos = list(self.srv.module_cfg.items()) + while todos: + modname, options = todos.pop(0) + if modname in self.modules: + # already created via Attached + continue + # For Pinata modules: we need to access this in Self.get_module + self.srv.module_cfg[modname] = dict(options) + modobj = self.get_module_instance(modname) # lazy + if modobj is None: + self.log.debug('Module %s returned None', modname) + continue + self.modules[modname] = modobj + if isinstance(modobj, Pinata): + # scan for dynamic devices + pinata = self.get_module(modname) + pinata_modules = list(pinata.scanModules()) + for name, _cfg in pinata_modules: + if name in self.srv.module_cfg: + self.log.error('Module %s, from pinata %s, already ' + 'exists in config file!', name, modname) + self.log.info('Pinata %s found %d modules', + modname, len(pinata_modules)) + todos.extend(pinata_modules) + + def export_accessibles(self, modulename): + self.log.debug('export_accessibles(%r)', modulename) + if modulename in self.export: + # omit export=False params! + res = OrderedDict() + for aobj in self.get_module(modulename).accessibles.values(): + if aobj.export: + res[aobj.export] = aobj.for_export() + self.log.debug('list accessibles for module %s -> %r', + modulename, res) + return res + self.log.debug('-> module is not to be exported!') + return OrderedDict() + + def get_descriptive_data(self, specifier): + """returns a python object which upon serialisation results in the + descriptive data""" + specifier = specifier or '' + modules = {} + result = {'modules': modules} + for modulename in self.export: + module = self.get_module(modulename) + if not module.export: + continue + # some of these need rework ! + mod_desc = {'accessibles': self.export_accessibles(modulename)} + mod_desc.update(module.exportProperties()) + mod_desc.pop('export', False) + modules[modulename] = mod_desc + modname, _, pname = specifier.partition(':') + if modname in modules: # extension to SECoP standard: description of a single module + result = modules[modname] + if pname in result['accessibles']: # extension to SECoP standard: description of a single accessible + # command is also accepted + result = result['accessibles'][pname] + elif pname: + raise NoSuchParameterError(f'Module {modname!r} ' + f'has no parameter {pname!r}') + elif not modname or modname == '.': + result['equipment_id'] = self.equipment_id + result['firmware'] = 'FRAPPY - The Python Framework for SECoP' + result['version'] = '2021.02' + result.update(self.nodeprops) + else: + raise NoSuchModuleError(f'Module {modname!r} does not exist') + return result + + def add_module(self, module, modulename): + """Adds a named module object to this SecNode.""" + self.modules[modulename] = module + if module.export: + self.export.append(modulename) + + # def remove_module(self, modulename_or_obj): + # moduleobj = self.get_module(modulename_or_obj) + # modulename = moduleobj.name + # if modulename in self.export: + # self.export.remove(modulename) + # self.modules.pop(modulename) + # self._subscriptions.pop(modulename, None) + # for k in [kk for kk in self._subscriptions if kk.startswith(f'{modulename}:')]: + # self._subscriptions.pop(k, None) + + def shutdown_modules(self): + """Call 'shutdownModule' for all modules.""" + for name in self._getSortedModules(): + self.modules[name].shutdownModule() + + def _getSortedModules(self): + """Sort modules topologically by inverse dependency. + + Example: if there is an IO device A and module B depends on it, then + the result will be [B, A]. + Right now, if the dependency graph is not a DAG, we give up and return + the unvisited nodes to be dismantled at the end. + Taken from Introduction to Algorithms [CLRS]. + """ + def go(name): + if name in done: # visiting a node + return True + if name in visited: + visited.add(name) + return False # cycle in dependencies -> fail + visited.add(name) + if name in unmarked: + unmarked.remove(name) + for module in self.modules[name].attachedModules.values(): + res = go(module.name) + if not res: + return False + visited.remove(name) + done.add(name) + l.append(name) + return True + + unmarked = set(self.modules.keys()) # unvisited nodes + visited = set() # visited in DFS, but not completed + done = set() + l = [] # list of sorted modules + + while unmarked: + if not go(unmarked.pop()): + self.log.error('cyclical dependency between modules!') + return l[::-1] + list(visited) + list(unmarked) + return l[::-1] diff --git a/frappy/server.py b/frappy/server.py index 5152728..36aea2a 100644 --- a/frappy/server.py +++ b/frappy/server.py @@ -25,14 +25,13 @@ import os import signal import sys -from collections import OrderedDict from frappy.config import load_config from frappy.errors import ConfigError -from frappy.dynamic import Pinata from frappy.lib import formatException, generalConfig, get_class, mkthread from frappy.lib.multievent import MultiEvent from frappy.params import PREDEFINED_ACCESSIBLES +from frappy.secnode import SecNode try: from daemon import DaemonContext @@ -171,14 +170,13 @@ class Server: # server_close() called by 'with' self.log.info(f'stopped listenning, cleaning up' - f' {len(self.modules)} modules') + f' {len(self.secnode.modules)} modules') # if systemd: # if self._restart: # systemd.daemon.notify('RELOADING=1') # else: # systemd.daemon.notify('STOPPING=1') - for name in self._getSortedModules(): - self.modules[name].shutdownModule() + self.secnode.shutdown_modules() if self._restart: self.restart_hook() self.log.info('restarting') @@ -205,50 +203,27 @@ class Server: errors = [] opts = dict(self.node_cfg) cls = get_class(opts.pop('cls')) - self.dispatcher = cls(opts.pop('name', self._cfgfiles), - self.log.getChild('dispatcher'), opts, self) + name = opts.pop('name', self._cfgfiles) + # TODO: opts not in both + self.secnode = SecNode(name, self.log.getChild('secnode'), opts, self) + self.dispatcher = cls(name, self.log.getChild('dispatcher'), opts, self) if opts: - self.dispatcher.errors.append(self.unknown_options(cls, opts)) - self.modules = OrderedDict() - - # create and initialize modules - todos = list(self.module_cfg.items()) - while todos: - modname, options = todos.pop(0) - if modname in self.modules: - # already created by Dispatcher (via Attached) - continue - # For Pinata modules: we need to access this in Dispatcher.get_module - self.module_cfg[modname] = dict(options) - modobj = self.dispatcher.get_module_instance(modname) # lazy - if modobj is None: - self.log.debug('Module %s returned None', modname) - continue - self.modules[modname] = modobj - if isinstance(modobj, Pinata): - # scan for dynamic devices - pinata = self.dispatcher.get_module(modname) - pinata_modules = list(pinata.scanModules()) - for name, _cfg in pinata_modules: - if name in self.module_cfg: - self.log.error('Module %s, from pinata %s, already' - ' exists in config file!', name, modname) - self.log.info('Pinata %s found %d modules', modname, len(pinata_modules)) - todos.extend(pinata_modules) + self.secnode.errors.append(self.unknown_options(cls, opts)) + self.secnode.create_modules() # initialize all modules by getting them with Dispatcher.get_module, # which is done in the get_descriptive data # TODO: caching, to not make this extra work - self.dispatcher.get_descriptive_data('') + self.secnode.get_descriptive_data('') # =========== All modules are initialized =========== # all errors from initialization process - errors = self.dispatcher.errors + errors = self.secnode.errors if not self._testonly: start_events = MultiEvent(default_timeout=30) - for modname, modobj in self.modules.items(): + for modname, modobj in self.secnode.modules.items(): # startModule must return either a timeout value or None (default 30 sec) start_events.name = f'module {modname}' modobj.startModule(start_events) @@ -275,7 +250,8 @@ class Server: self.log.info('all modules started') history_path = os.environ.get('FRAPPY_HISTORY') if history_path: - from frappy_psi.historywriter import FrappyHistoryWriter # pylint: disable=import-outside-toplevel + from frappy_psi.historywriter import \ + FrappyHistoryWriter # pylint: disable=import-outside-toplevel writer = FrappyHistoryWriter(history_path, PREDEFINED_ACCESSIBLES.keys(), self.dispatcher) # treat writer as a connection self.dispatcher.add_connection(writer) @@ -288,41 +264,3 @@ class Server: # history_path = os.environ.get('ALTERNATIVE_HISTORY') # if history_path: # from frappy_.historywriter import ... etc. - - def _getSortedModules(self): - """Sort modules topologically by inverse dependency. - - Example: if there is an IO device A and module B depends on it, then - the result will be [B, A]. - Right now, if the dependency graph is not a DAG, we give up and return - the unvisited nodes to be dismantled at the end. - Taken from Introduction to Algorithms [CLRS]. - """ - def go(name): - if name in done: # visiting a node - return True - if name in visited: - visited.add(name) - return False # cycle in dependencies -> fail - visited.add(name) - if name in unmarked: - unmarked.remove(name) - for module in self.modules[name].attachedModules.values(): - res = go(module.name) - if not res: - return False - visited.remove(name) - done.add(name) - l.append(name) - return True - - unmarked = set(self.modules.keys()) # unvisited nodes - visited = set() # visited in DFS, but not completed - done = set() - l = [] # list of sorted modules - - while unmarked: - if not go(unmarked.pop()): - self.log.error('cyclical dependency between modules!') - return l[::-1] + list(visited) + list(unmarked) - return l[::-1] diff --git a/frappy_psi/sea.py b/frappy_psi/sea.py index 4554c22..6d4cf4c 100644 --- a/frappy_psi/sea.py +++ b/frappy_psi/sea.py @@ -252,7 +252,7 @@ class SeaClient(ProxyClient, Module): if result == '1': self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode()) else: - self.DISPATCHER.shutdown() + self.secNode.srv.shutdown() try: reply = self.asynio.readline() if reply is None: @@ -316,7 +316,7 @@ class SeaClient(ProxyClient, Module): if path == '/device/changetime': recheck = time.time() + 1 elif path.startswith('/device/frappy_%s' % self.service) and value == '': - self.DISPATCHER.shutdown() + self.secNode.srv.shutdown() else: for module, param in mplist: oldv, oldt, oldr = self.cache.get((module, param), [None, None, None]) @@ -659,7 +659,7 @@ class SeaModule(Module): readerror = secop_error(e) pobj.readerror = readerror if pobj.export: - self.DISPATCHER.broadcast_event(make_update(self.name, pobj)) + self.secNode.srv.dispatcher.broadcast_event(make_update(self.name, pobj)) def initModule(self): self.io.register_obj(self, self.sea_object) diff --git a/test/test_attach.py b/test/test_attach.py index 6b6c251..de41e35 100644 --- a/test/test_attach.py +++ b/test/test_attach.py @@ -57,11 +57,23 @@ class LoggerStub: logger = LoggerStub() +class SecNodeStub: + def __init__(self): + self.modules = {} + + def add_module(self, module, modname): + self.modules[modname] = module + + def get_module(self, modname): + return self.modules[modname] + + class ServerStub: restart = None shutdown = None def __init__(self): + self.secnode = SecNodeStub() self.dispatcher = Dispatcher('dispatcher', logger, {}, self) @@ -73,6 +85,6 @@ def test_attach(): a = Module('a', logger, {'description': ''}, srv) m = Mod('m', logger, {'description': '', 'att': 'a'}, srv) assert m.propertyValues['att'] == 'a' - srv.dispatcher.register_module(a, 'a') - srv.dispatcher.register_module(m, 'm') + srv.secnode.add_module(a, 'a') + srv.secnode.add_module(m, 'm') assert m.att == a diff --git a/test/test_handler.py b/test/test_handler.py index 2215673..2e738fd 100644 --- a/test/test_handler.py +++ b/test/test_handler.py @@ -58,6 +58,7 @@ logger = LoggerStub() class ServerStub: def __init__(self, updates): self.dispatcher = DispatcherStub(updates) + self.secnode = None class ModuleTest(Module): diff --git a/test/test_logging.py b/test/test_logging.py index bd1af39..d6a9428 100644 --- a/test/test_logging.py +++ b/test/test_logging.py @@ -29,11 +29,24 @@ import frappy.logging from frappy.logging import logger, generalConfig, HasComlog +class SecNodeStub: + def __init__(self): + self.modules = {} + self.name = "" + + def add_module(self, module, modname): + self.modules[modname] = module + + def get_module(self, modname): + return self.modules[modname] + + class ServerStub: restart = None shutdown = None def __init__(self): + self.secnode = SecNodeStub() self.dispatcher = Dispatcher('', logger.log.getChild('dispatcher'), {}, self) @@ -98,7 +111,7 @@ def init_(monkeypatch): def __init__(self, name, srv, **kwds): kwds['description'] = '' super().__init__(name or 'mod', logger.log.getChild(name), kwds, srv) - srv.dispatcher.register_module(self, name, name) + srv.secnode.add_module(self, name) self.result[:] = [] def earlyInit(self): diff --git a/test/test_modules.py b/test/test_modules.py index 7da7853..e1d8313 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -65,6 +65,7 @@ logger = LoggerStub() class ServerStub: def __init__(self, updates): self.dispatcher = DispatcherStub(updates) + self.secnode = None class DummyMultiEvent(threading.Event): @@ -712,6 +713,7 @@ def test_super_call(): class ServerStub1: def __init__(self, updates): self.dispatcher = DispatcherStub1(updates) + self.secnode = None updates = [] srv = ServerStub1(updates) diff --git a/test/test_persistent.py b/test/test_persistent.py index 2d58d4b..5823b45 100644 --- a/test/test_persistent.py +++ b/test/test_persistent.py @@ -30,6 +30,10 @@ from frappy.lib import generalConfig from frappy.persistent import PersistentParam, PersistentMixin +class SecNodeStub: + pass + + class DispatcherStub: def announce_update(self, modulename, pname, pobj): pass @@ -48,7 +52,8 @@ logger = LoggerStub() class ServerStub: def __init__(self, equipment_id): self.dispatcher = DispatcherStub() - self.dispatcher.equipment_id = equipment_id + self.secnode = SecNodeStub() + self.secnode.equipment_id = equipment_id class Mod(PersistentMixin, Module): diff --git a/test/test_poller.py b/test/test_poller.py index b47d38e..13ef74d 100644 --- a/test/test_poller.py +++ b/test/test_poller.py @@ -69,6 +69,7 @@ class ServerStub: def __init__(self): generalConfig.testinit() self.dispatcher = DispatcherStub() + self.secnode = None class Base(Module): diff --git a/test/test_statemachine.py b/test/test_statemachine.py index 2a49021..978d2cf 100644 --- a/test/test_statemachine.py +++ b/test/test_statemachine.py @@ -206,6 +206,7 @@ class DispatcherStub: class ServerStub: def __init__(self, updates): self.dispatcher = DispatcherStub(updates) + self.secnode = None class Mod(HasStates, Drivable):