From 757e96e7c0a042cfd6a79f7a10733619da53f793 Mon Sep 17 00:00:00 2001 From: Alexander Zaft Date: Wed, 4 Oct 2023 09:27:15 +0200 Subject: [PATCH] core: move module handling out of dispatcher Split module handling code from the dispatcher. The new class for managing Modules is called SecNode. * change logging to no longer need a reference to modobj * modules get a reference to the secnode obj instead of the dispatcher * intermediate usage fixes for frappy_psi/sea Change-Id: Ifee4bb47aa7a4508bb4a47c9a5873b7e2d5faf67 Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/32249 Reviewed-by: Alexander Zaft Tested-by: Jenkins Automated Tests --- frappy/io.py | 2 +- frappy/logging.py | 14 +- frappy/modulebase.py | 15 +- frappy/modules.py | 2 +- frappy/persistent.py | 2 +- frappy/protocol/dispatcher.py | 206 ++----------------------- frappy/secnode.py | 281 ++++++++++++++++++++++++++++++++++ frappy/server.py | 90 ++--------- frappy_psi/sea.py | 6 +- test/test_attach.py | 16 +- test/test_handler.py | 1 + test/test_logging.py | 15 +- test/test_modules.py | 2 + test/test_persistent.py | 7 +- test/test_poller.py | 1 + test/test_statemachine.py | 1 + 16 files changed, 373 insertions(+), 288 deletions(-) create mode 100644 frappy/secnode.py diff --git a/frappy/io.py b/frappy/io.py index e998b222..417d06a2 100644 --- a/frappy/io.py +++ b/frappy/io.py @@ -62,7 +62,7 @@ class HasIO(Module): io = self.ioClass(ioname, srv.log.getChild(ioname), opts, srv) # pylint: disable=not-callable io.callingModule = [] srv.modules[ioname] = io - srv.dispatcher.register_module(io, ioname) + srv.secnode.add_module(io, ioname) self.ioDict[self.uri] = ioname self.io = ioname diff --git a/frappy/logging.py b/frappy/logging.py index 122bfbc5..b9c20a4a 100644 --- a/frappy/logging.py +++ b/frappy/logging.py @@ -54,6 +54,8 @@ class RemoteLogHandler(mlzlog.Handler): def __init__(self): super().__init__() self.subscriptions = {} # dict[modname] of tuple(mobobj, dict [conn] of level) + # None will be replaced by a callback when one is first installed + self.send_log = None def emit(self, record): """unused""" @@ -61,18 +63,18 @@ class RemoteLogHandler(mlzlog.Handler): def handle(self, record): modname = record.name.split('.')[-1] try: - modobj, subscriptions = self.subscriptions[modname] + subscriptions = self.subscriptions[modname] except KeyError: return for conn, lev in subscriptions.items(): if record.levelno >= lev: - modobj.DISPATCHER.send_log_msg( - conn, modobj.name, LEVEL_NAMES[record.levelno], + self.send_log( # pylint: disable=not-callable + conn, modname, LEVEL_NAMES[record.levelno], record.getMessage()) - def set_conn_level(self, modobj, conn, level): + def set_conn_level(self, modname, conn, level): level = check_level(level) - modobj, subscriptions = self.subscriptions.setdefault(modobj.name, (modobj, {})) + subscriptions = self.subscriptions.setdefault(modname, {}) if level == OFF: subscriptions.pop(conn, None) else: @@ -126,7 +128,7 @@ class HasComlog: if self.comlog and generalConfig.initialized and generalConfig.comlog: self._comLog = mlzlog.Logger(f'COMLOG.{self.name}') self._comLog.handlers[:] = [] - directory = join(logger.logdir, logger.rootname, 'comlog', self.DISPATCHER.name) + directory = join(logger.logdir, logger.rootname, 'comlog', self.secNode.name) self._comLog.addHandler(ComLogfileHandler( directory, self.name, max_days=generalConfig.getint('comlog_days', 7))) return diff --git a/frappy/modulebase.py b/frappy/modulebase.py index 07a0f289..9d25648a 100644 --- a/frappy/modulebase.py +++ b/frappy/modulebase.py @@ -327,14 +327,13 @@ class Module(HasAccessibles): NoneOr(FloatRange(0)), export=False, default=None) enablePoll = True - # reference to the dispatcher (used for sending async updates) - DISPATCHER = None pollInfo = None triggerPoll = None # trigger event for polls. used on io modules and modules without io def __init__(self, name, logger, cfgdict, srv): - # remember the dispatcher object (for the async callbacks) - self.DISPATCHER = srv.dispatcher + # remember the secnode for interacting with other modules and the + # server + self.secNode = srv.secnode self.log = logger self.name = name self.valueCallbacks = {} @@ -349,6 +348,7 @@ class Module(HasAccessibles): self.attachedModules = {} self.errors = [] self._isinitialized = False + self.updateCallback = srv.dispatcher.announce_update # handle module properties # 1) make local copies of properties @@ -549,7 +549,7 @@ class Module(HasAccessibles): arg = value pobj.readerror = None if pobj.export: - self.DISPATCHER.announce_update(self.name, pname, pobj) + self.updateCallback(self.name, pname, pobj) cblist = callbacks[pname] for cb in cblist: try: @@ -818,15 +818,16 @@ class Module(HasAccessibles): except Exception: self.log.error(formatException()) - def setRemoteLogging(self, conn, level): + def setRemoteLogging(self, conn, level, send_log): if self.remoteLogHandler is None: for handler in self.log.handlers: if isinstance(handler, RemoteLogHandler): + handler.send_log = send_log self.remoteLogHandler = handler break else: raise ValueError('remote handler not found') - self.remoteLogHandler.set_conn_level(self, conn, level) + self.remoteLogHandler.set_conn_level(self.name, conn, level) def checkLimits(self, value, pname='target'): """check for limits diff --git a/frappy/modules.py b/frappy/modules.py index 70a303de..39dc7066 100644 --- a/frappy/modules.py +++ b/frappy/modules.py @@ -132,7 +132,7 @@ class Attached(Property): modulename = super().__get__(obj, owner) if not modulename: return None # happens when mandatory=False and modulename is not given - modobj = obj.DISPATCHER.get_module(modulename) + modobj = obj.secNode.get_module(modulename) if not isinstance(modobj, self.basecls): raise ConfigError(f'attached module {self.name}={modobj.name!r} ' f'must inherit from {self.basecls.__qualname__!r}') diff --git a/frappy/persistent.py b/frappy/persistent.py index c7649ee8..e43897c3 100644 --- a/frappy/persistent.py +++ b/frappy/persistent.py @@ -77,7 +77,7 @@ class PersistentMixin(Module): super().__init__(name, logger, cfgdict, srv) persistentdir = os.path.join(generalConfig.logdir, 'persistent') os.makedirs(persistentdir, exist_ok=True) - self.persistentFile = os.path.join(persistentdir, f'{self.DISPATCHER.equipment_id}.{self.name}.json') + self.persistentFile = os.path.join(persistentdir, f'{self.secNode.equipment_id}.{self.name}.json') self.initData = {} # "factory" settings loaded = self.loadPersistentData() for pname, pobj in self.parameters.items(): diff --git a/frappy/protocol/dispatcher.py b/frappy/protocol/dispatcher.py index 2252ebcd..c2d243bd 100644 --- a/frappy/protocol/dispatcher.py +++ b/frappy/protocol/dispatcher.py @@ -17,6 +17,7 @@ # Module authors: # Enrico Faulhaber # Markus Zolliker +# Alexander Zaft # # ***************************************************************************** """Dispatcher for SECoP Messages @@ -28,28 +29,18 @@ Interface to the service offering part: on the connectionobj or on activated connections - 'add_connection(connectionobj)' registers new connection - 'remove_connection(connectionobj)' removes now longer functional connection - -Interface to the modules: - - add_module(modulename, moduleobj, export=True) registers a new module under the - given name, may also register it for exporting (making accessible) - - get_module(modulename) returns the requested module or None - - remove_module(modulename_or_obj): removes the module (during shutdown) - """ import threading -import traceback -from collections import OrderedDict from time import time as currenttime from frappy.errors import NoSuchCommandError, NoSuchModuleError, \ - NoSuchParameterError, ProtocolError, ReadOnlyError, ConfigError + NoSuchParameterError, ProtocolError, ReadOnlyError from frappy.params import Parameter from frappy.protocol.messages import COMMANDREPLY, DESCRIPTIONREPLY, \ DISABLEEVENTSREPLY, ENABLEEVENTSREPLY, ERRORPREFIX, EVENTREPLY, \ - HEARTBEATREPLY, IDENTREPLY, IDENTREQUEST, READREPLY, WRITEREPLY, \ - LOGGING_REPLY, LOG_EVENT -from frappy.lib import get_class + HEARTBEATREPLY, IDENTREPLY, IDENTREQUEST, LOG_EVENT, LOGGING_REPLY, \ + READREPLY, WRITEREPLY def make_update(modulename, pobj): @@ -70,10 +61,7 @@ class Dispatcher: self.nodeprops[k] = options.pop(k) self.log = logger - # map ALL modulename -> moduleobj - self._modules = {} - # list of EXPORTED modules - self._export = [] + self.secnode = srv.secnode # list all connections self._connections = [] # active (i.e. broadcast-receiving) connections @@ -87,11 +75,6 @@ class Dispatcher: self.shutdown = srv.shutdown # handle to server self.srv = srv - # set of modules that failed creation - self.failed_modules = set() - # list of errors that occured during initialization - self.errors = [] - self.traceback_counter = 0 def broadcast_event(self, msg, reallyall=False): """broadcasts a msg to all active connections @@ -147,163 +130,8 @@ class Dispatcher: self._connections.remove(conn) self.reset_connection(conn) - def register_module(self, moduleobj, modulename, export=True): - self.log.debug('registering module %r as %s (export=%r)', - moduleobj, modulename, export) - self._modules[modulename] = moduleobj - if export: - self._export.append(modulename) - - def get_module(self, modulename): - """ Returns a fully initialized module. Or None, if something went - wrong during instatiating/initializing the module.""" - modobj = self.get_module_instance(modulename) - if modobj is None: - return None - if modobj._isinitialized: - return modobj - - # also call earlyInit on the modules - self.log.debug('initializing module %r', modulename) - try: - modobj.earlyInit() - if not modobj.earlyInitDone: - self.errors.append(f'{modobj.earlyInit.__qualname__} was not called, probably missing super call') - modobj.initModule() - if not modobj.initModuleDone: - self.errors.append(f'{modobj.initModule.__qualname__} was not called, probably missing super call') - except Exception as e: - if self.traceback_counter == 0: - self.log.exception(traceback.format_exc()) - self.traceback_counter += 1 - self.errors.append(f'error initializing {modulename}: {e!r}') - modobj._isinitialized = True - self.log.debug('initialized module %r', modulename) - return modobj - - def get_module_instance(self, modulename): - """ Returns the module in its current initialization state or creates a - new uninitialized modle to return. - - When creating a new module, srv.module_config is accessed to get the - modules configuration. - """ - if modulename in self._modules: - return self._modules[modulename] - if modulename in list(self._modules.values()): - # it's actually already the module object - return modulename - # create module from srv.module_cfg, store and return - self.log.debug('attempting to create module %r', modulename) - - opts = self.srv.module_cfg.get(modulename, None) - if opts is None: - raise NoSuchModuleError(f'Module {modulename!r} does not exist on this SEC-Node!') - pymodule = None - try: # pylint: disable=no-else-return - classname = opts.pop('cls') - if isinstance(classname, str): - pymodule = classname.rpartition('.')[0] - if pymodule in self.failed_modules: - # creation has failed already once, do not try again - return None - cls = get_class(classname) - else: - pymodule = classname.__module__ - if pymodule in self.failed_modules: - # creation has failed already once, do not try again - return None - cls = classname - except Exception as e: - if str(e) == 'no such class': - self.errors.append(f'{classname} not found') - else: - self.failed_modules.add(pymodule) - if self.traceback_counter == 0: - self.log.exception(traceback.format_exc()) - self.traceback_counter += 1 - self.errors.append(f'error importing {classname}') - return None - else: - try: - modobj = cls(modulename, self.log.parent.getChild(modulename), opts, self.srv) - except ConfigError as e: - self.errors.append(f'error creating module {modulename}:') - for errtxt in e.args[0] if isinstance(e.args[0], list) else [e.args[0]]: - self.errors.append(' ' + errtxt) - modobj = None - except Exception as e: - if self.traceback_counter == 0: - self.log.exception(traceback.format_exc()) - self.traceback_counter += 1 - self.errors.append(f'error creating {modulename}') - modobj = None - if modobj: - self.register_module(modobj, modulename, modobj.export) - self.srv.modules[modulename] = modobj # IS HERE THE CORRECT PLACE? - return modobj - - def remove_module(self, modulename_or_obj): - moduleobj = self.get_module(modulename_or_obj) - modulename = moduleobj.name - if modulename in self._export: - self._export.remove(modulename) - self._modules.pop(modulename) - self._subscriptions.pop(modulename, None) - for k in [kk for kk in self._subscriptions if kk.startswith(f'{modulename}:')]: - self._subscriptions.pop(k, None) - - def list_module_names(self): - # return a copy of our list - return self._export[:] - - def export_accessibles(self, modulename): - self.log.debug('export_accessibles(%r)', modulename) - if modulename in self._export: - # omit export=False params! - res = OrderedDict() - for aobj in self.get_module(modulename).accessibles.values(): - if aobj.export: - res[aobj.export] = aobj.for_export() - self.log.debug('list accessibles for module %s -> %r', - modulename, res) - return res - self.log.debug('-> module is not to be exported!') - return OrderedDict() - - def get_descriptive_data(self, specifier): - """returns a python object which upon serialisation results in the descriptive data""" - specifier = specifier or '' - modules = {} - result = {'modules': modules} - for modulename in self._export: - module = self.get_module(modulename) - if not module.export: - continue - # some of these need rework ! - mod_desc = {'accessibles': self.export_accessibles(modulename)} - mod_desc.update(module.exportProperties()) - mod_desc.pop('export', False) - modules[modulename] = mod_desc - modname, _, pname = specifier.partition(':') - if modname in modules: # extension to SECoP standard: description of a single module - result = modules[modname] - if pname in result['accessibles']: # extension to SECoP standard: description of a single accessible - # command is also accepted - result = result['accessibles'][pname] - elif pname: - raise NoSuchParameterError(f'Module {modname!r} has no parameter {pname!r}') - elif not modname or modname == '.': - result['equipment_id'] = self.equipment_id - result['firmware'] = 'FRAPPY - The Python Framework for SECoP' - result['version'] = '2021.02' - result.update(self.nodeprops) - else: - raise NoSuchModuleError(f'Module {modname!r} does not exist') - return result - def _execute_command(self, modulename, exportedname, argument=None): - moduleobj = self.get_module(modulename) + moduleobj = self.secnode.get_module(modulename) if moduleobj is None: raise NoSuchModuleError(f'Module {modulename!r} does not exist') @@ -322,7 +150,7 @@ class Dispatcher: return result, {'t': currenttime()} def _setParameterValue(self, modulename, exportedname, value): - moduleobj = self.get_module(modulename) + moduleobj = self.secnode.get_module(modulename) if moduleobj is None: raise NoSuchModuleError(f'Module {modulename!r} does not exist') @@ -343,7 +171,7 @@ class Dispatcher: return pobj.export_value(), {'t': pobj.timestamp} if pobj.timestamp else {} def _getParameterValue(self, modulename, exportedname): - moduleobj = self.get_module(modulename) + moduleobj = self.secnode.get_module(modulename) if moduleobj is None: raise NoSuchModuleError(f'Module {modulename!r} does not exist') @@ -400,7 +228,7 @@ class Dispatcher: return (IDENTREPLY, None, None) def handle_describe(self, conn, specifier, data): - return (DESCRIPTIONREPLY, specifier or '.', self.get_descriptive_data(specifier)) + return (DESCRIPTIONREPLY, specifier or '.', self.secnode.get_descriptive_data(specifier)) def handle_read(self, conn, specifier, data): if data: @@ -439,9 +267,9 @@ class Dispatcher: modulename, exportedname = specifier, None if ':' in specifier: modulename, exportedname = specifier.split(':', 1) - if modulename not in self._export: + if modulename not in self.secnode.export: raise NoSuchModuleError(f'Module {modulename!r} does not exist') - moduleobj = self.get_module(modulename) + moduleobj = self.secnode.get_module(modulename) if exportedname is not None: pname = moduleobj.accessiblename2attr.get(exportedname, True) if pname and pname not in moduleobj.accessibles: @@ -455,12 +283,12 @@ class Dispatcher: else: # activate all modules self._active_connections.add(conn) - modules = [(m, None) for m in self._export] + modules = [(m, None) for m in self.secnode.export] # send updates for all subscribed values. # note: The initial poll already happend before the server is active for modulename, pname in modules: - moduleobj = self._modules.get(modulename, None) + moduleobj = self.secnode.modules.get(modulename, None) if pname: conn.send_reply(make_update(modulename, moduleobj.parameters[pname])) continue @@ -484,13 +312,13 @@ class Dispatcher: conn.send_reply((LOG_EVENT, f'{modname}:{level}', msg)) def set_all_log_levels(self, conn, level): - for modobj in self._modules.values(): - modobj.setRemoteLogging(conn, level) + for modobj in self.secnode.modules.values(): + modobj.setRemoteLogging(conn, level, self.send_log_msg) def handle_logging(self, conn, specifier, level): if specifier and specifier != '.': - modobj = self._modules[specifier] - modobj.setRemoteLogging(conn, level) + modobj = self.secnode.modules[specifier] + modobj.setRemoteLogging(conn, level, self.send_log_msg) else: self.set_all_log_levels(conn, level) return LOGGING_REPLY, specifier, level diff --git a/frappy/secnode.py b/frappy/secnode.py new file mode 100644 index 00000000..6b7b91a3 --- /dev/null +++ b/frappy/secnode.py @@ -0,0 +1,281 @@ +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Alexander Zaft +# +# ***************************************************************************** + +import traceback +from collections import OrderedDict + +from frappy.dynamic import Pinata +from frappy.errors import ConfigError, NoSuchModuleError, NoSuchParameterError +from frappy.lib import get_class + + +class SecNode: + """Managing the modules. + + Interface to the modules: + - add_module(module, modulename) + - get_module(modulename) returns the requested module or None if there is + no suitable configuration on the server + """ + def __init__(self, name, logger, options, srv): + self.equipment_id = options.pop('equipment_id', name) + self.nodeprops = {} + for k in list(options): + self.nodeprops[k] = options.pop(k) + # map ALL modulename -> moduleobj + self.modules = {} + # list of EXPORTED modules + self.export = [] + self.log = logger + self.srv = srv + # set of modules that failed creation + self.failed_modules = set() + # list of errors that occured during initialization + self.errors = [] + self.traceback_counter = 0 + self.name = name + + def get_module(self, modulename): + """ Returns a fully initialized module. Or None, if something went + wrong during instatiating/initializing the module.""" + modobj = self.get_module_instance(modulename) + if modobj is None: + return None + if modobj._isinitialized: + return modobj + + # also call earlyInit on the modules + self.log.debug('initializing module %r', modulename) + try: + modobj.earlyInit() + if not modobj.earlyInitDone: + self.errors.append(f'{modobj.earlyInit.__qualname__} was not ' + f'called, probably missing super call') + modobj.initModule() + if not modobj.initModuleDone: + self.errors.append(f'{modobj.initModule.__qualname__} was not ' + f'called, probably missing super call') + except Exception as e: + if self.traceback_counter == 0: + self.log.exception(traceback.format_exc()) + self.traceback_counter += 1 + self.errors.append(f'error initializing {modulename}: {e!r}') + modobj._isinitialized = True + self.log.debug('initialized module %r', modulename) + return modobj + + def get_module_instance(self, modulename): + """ Returns the module in its current initialization state or creates a + new uninitialized modle to return. + + When creating a new module, srv.module_config is accessed to get the + modules configuration. + """ + if modulename in self.modules: + return self.modules[modulename] + if modulename in list(self.modules.values()): + # it's actually already the module object + return modulename + # create module from srv.module_cfg, store and return + self.log.debug('attempting to create module %r', modulename) + + opts = self.srv.module_cfg.get(modulename, None) + if opts is None: + raise NoSuchModuleError(f'Module {modulename!r} does not exist on ' + f'this SEC-Node!') + pymodule = None + try: # pylint: disable=no-else-return + classname = opts.pop('cls') + if isinstance(classname, str): + pymodule = classname.rpartition('.')[0] + if pymodule in self.failed_modules: + # creation has failed already once, do not try again + return None + cls = get_class(classname) + else: + pymodule = classname.__module__ + if pymodule in self.failed_modules: + # creation has failed already once, do not try again + return None + cls = classname + except Exception as e: + if str(e) == 'no such class': + self.errors.append(f'{classname} not found') + else: + self.failed_modules.add(pymodule) + if self.traceback_counter == 0: + self.log.exception(traceback.format_exc()) + self.traceback_counter += 1 + self.errors.append(f'error importing {classname}') + return None + else: + try: + modobj = cls(modulename, self.log.parent.getChild(modulename), + opts, self.srv) + except ConfigError as e: + self.errors.append(f'error creating module {modulename}:') + for errtxt in e.args[0] if isinstance(e.args[0], list) else [e.args[0]]: + self.errors.append(' ' + errtxt) + modobj = None + except Exception as e: + if self.traceback_counter == 0: + self.log.exception(traceback.format_exc()) + self.traceback_counter += 1 + self.errors.append(f'error creating {modulename}') + modobj = None + if modobj: + self.add_module(modobj, modulename) + return modobj + + def create_modules(self): + self.modules = OrderedDict() + + # create and initialize modules + todos = list(self.srv.module_cfg.items()) + while todos: + modname, options = todos.pop(0) + if modname in self.modules: + # already created via Attached + continue + # For Pinata modules: we need to access this in Self.get_module + self.srv.module_cfg[modname] = dict(options) + modobj = self.get_module_instance(modname) # lazy + if modobj is None: + self.log.debug('Module %s returned None', modname) + continue + self.modules[modname] = modobj + if isinstance(modobj, Pinata): + # scan for dynamic devices + pinata = self.get_module(modname) + pinata_modules = list(pinata.scanModules()) + for name, _cfg in pinata_modules: + if name in self.srv.module_cfg: + self.log.error('Module %s, from pinata %s, already ' + 'exists in config file!', name, modname) + self.log.info('Pinata %s found %d modules', + modname, len(pinata_modules)) + todos.extend(pinata_modules) + + def export_accessibles(self, modulename): + self.log.debug('export_accessibles(%r)', modulename) + if modulename in self.export: + # omit export=False params! + res = OrderedDict() + for aobj in self.get_module(modulename).accessibles.values(): + if aobj.export: + res[aobj.export] = aobj.for_export() + self.log.debug('list accessibles for module %s -> %r', + modulename, res) + return res + self.log.debug('-> module is not to be exported!') + return OrderedDict() + + def get_descriptive_data(self, specifier): + """returns a python object which upon serialisation results in the + descriptive data""" + specifier = specifier or '' + modules = {} + result = {'modules': modules} + for modulename in self.export: + module = self.get_module(modulename) + if not module.export: + continue + # some of these need rework ! + mod_desc = {'accessibles': self.export_accessibles(modulename)} + mod_desc.update(module.exportProperties()) + mod_desc.pop('export', False) + modules[modulename] = mod_desc + modname, _, pname = specifier.partition(':') + if modname in modules: # extension to SECoP standard: description of a single module + result = modules[modname] + if pname in result['accessibles']: # extension to SECoP standard: description of a single accessible + # command is also accepted + result = result['accessibles'][pname] + elif pname: + raise NoSuchParameterError(f'Module {modname!r} ' + f'has no parameter {pname!r}') + elif not modname or modname == '.': + result['equipment_id'] = self.equipment_id + result['firmware'] = 'FRAPPY - The Python Framework for SECoP' + result['version'] = '2021.02' + result.update(self.nodeprops) + else: + raise NoSuchModuleError(f'Module {modname!r} does not exist') + return result + + def add_module(self, module, modulename): + """Adds a named module object to this SecNode.""" + self.modules[modulename] = module + if module.export: + self.export.append(modulename) + + # def remove_module(self, modulename_or_obj): + # moduleobj = self.get_module(modulename_or_obj) + # modulename = moduleobj.name + # if modulename in self.export: + # self.export.remove(modulename) + # self.modules.pop(modulename) + # self._subscriptions.pop(modulename, None) + # for k in [kk for kk in self._subscriptions if kk.startswith(f'{modulename}:')]: + # self._subscriptions.pop(k, None) + + def shutdown_modules(self): + """Call 'shutdownModule' for all modules.""" + for name in self._getSortedModules(): + self.modules[name].shutdownModule() + + def _getSortedModules(self): + """Sort modules topologically by inverse dependency. + + Example: if there is an IO device A and module B depends on it, then + the result will be [B, A]. + Right now, if the dependency graph is not a DAG, we give up and return + the unvisited nodes to be dismantled at the end. + Taken from Introduction to Algorithms [CLRS]. + """ + def go(name): + if name in done: # visiting a node + return True + if name in visited: + visited.add(name) + return False # cycle in dependencies -> fail + visited.add(name) + if name in unmarked: + unmarked.remove(name) + for module in self.modules[name].attachedModules.values(): + res = go(module.name) + if not res: + return False + visited.remove(name) + done.add(name) + l.append(name) + return True + + unmarked = set(self.modules.keys()) # unvisited nodes + visited = set() # visited in DFS, but not completed + done = set() + l = [] # list of sorted modules + + while unmarked: + if not go(unmarked.pop()): + self.log.error('cyclical dependency between modules!') + return l[::-1] + list(visited) + list(unmarked) + return l[::-1] diff --git a/frappy/server.py b/frappy/server.py index e0d0c444..1e4f3fe3 100644 --- a/frappy/server.py +++ b/frappy/server.py @@ -25,14 +25,13 @@ import os import signal import sys -from collections import OrderedDict from frappy.config import load_config from frappy.errors import ConfigError -from frappy.dynamic import Pinata from frappy.lib import formatException, generalConfig, get_class, mkthread from frappy.lib.multievent import MultiEvent from frappy.params import PREDEFINED_ACCESSIBLES +from frappy.secnode import SecNode try: from daemon import DaemonContext @@ -175,14 +174,13 @@ class Server: # server_close() called by 'with' self.log.info(f'stopped listenning, cleaning up' - f' {len(self.modules)} modules') + f' {len(self.secnode.modules)} modules') # if systemd: # if self._restart: # systemd.daemon.notify('RELOADING=1') # else: # systemd.daemon.notify('STOPPING=1') - for name in self._getSortedModules(): - self.modules[name].shutdownModule() + self.secnode.shutdown_modules() if self._restart: self.restart_hook() self.log.info('restarting') @@ -209,50 +207,27 @@ class Server: errors = [] opts = dict(self.node_cfg) cls = get_class(opts.pop('cls')) - self.dispatcher = cls(opts.pop('name', self._cfgfiles), - self.log.getChild('dispatcher'), opts, self) + name = opts.pop('name', self._cfgfiles) + # TODO: opts not in both + self.secnode = SecNode(name, self.log.getChild('secnode'), opts, self) + self.dispatcher = cls(name, self.log.getChild('dispatcher'), opts, self) if opts: - self.dispatcher.errors.append(self.unknown_options(cls, opts)) - self.modules = OrderedDict() - - # create and initialize modules - todos = list(self.module_cfg.items()) - while todos: - modname, options = todos.pop(0) - if modname in self.modules: - # already created by Dispatcher (via Attached) - continue - # For Pinata modules: we need to access this in Dispatcher.get_module - self.module_cfg[modname] = dict(options) - modobj = self.dispatcher.get_module_instance(modname) # lazy - if modobj is None: - self.log.debug('Module %s returned None', modname) - continue - self.modules[modname] = modobj - if isinstance(modobj, Pinata): - # scan for dynamic devices - pinata = self.dispatcher.get_module(modname) - pinata_modules = list(pinata.scanModules()) - for name, _cfg in pinata_modules: - if name in self.module_cfg: - self.log.error('Module %s, from pinata %s, already' - ' exists in config file!', name, modname) - self.log.info('Pinata %s found %d modules', modname, len(pinata_modules)) - todos.extend(pinata_modules) + self.secnode.errors.append(self.unknown_options(cls, opts)) + self.secnode.create_modules() # initialize all modules by getting them with Dispatcher.get_module, # which is done in the get_descriptive data # TODO: caching, to not make this extra work - self.dispatcher.get_descriptive_data('') + self.secnode.get_descriptive_data('') # =========== All modules are initialized =========== # all errors from initialization process - errors = self.dispatcher.errors + errors = self.secnode.errors if not self._testonly: start_events = MultiEvent(default_timeout=30) - for modname, modobj in self.modules.items(): + for modname, modobj in self.secnode.modules.items(): # startModule must return either a timeout value or None (default 30 sec) start_events.name = f'module {modname}' modobj.startModule(start_events) @@ -279,7 +254,8 @@ class Server: self.log.info('all modules started') history_path = os.environ.get('FRAPPY_HISTORY') if history_path: - from frappy_psi.historywriter import FrappyHistoryWriter # pylint: disable=import-outside-toplevel + from frappy_psi.historywriter import \ + FrappyHistoryWriter # pylint: disable=import-outside-toplevel writer = FrappyHistoryWriter(history_path, PREDEFINED_ACCESSIBLES.keys(), self.dispatcher) # treat writer as a connection self.dispatcher.add_connection(writer) @@ -292,41 +268,3 @@ class Server: # history_path = os.environ.get('ALTERNATIVE_HISTORY') # if history_path: # from frappy_.historywriter import ... etc. - - def _getSortedModules(self): - """Sort modules topologically by inverse dependency. - - Example: if there is an IO device A and module B depends on it, then - the result will be [B, A]. - Right now, if the dependency graph is not a DAG, we give up and return - the unvisited nodes to be dismantled at the end. - Taken from Introduction to Algorithms [CLRS]. - """ - def go(name): - if name in done: # visiting a node - return True - if name in visited: - visited.add(name) - return False # cycle in dependencies -> fail - visited.add(name) - if name in unmarked: - unmarked.remove(name) - for module in self.modules[name].attachedModules.values(): - res = go(module.name) - if not res: - return False - visited.remove(name) - done.add(name) - l.append(name) - return True - - unmarked = set(self.modules.keys()) # unvisited nodes - visited = set() # visited in DFS, but not completed - done = set() - l = [] # list of sorted modules - - while unmarked: - if not go(unmarked.pop()): - self.log.error('cyclical dependency between modules!') - return l[::-1] + list(visited) + list(unmarked) - return l[::-1] diff --git a/frappy_psi/sea.py b/frappy_psi/sea.py index cd1e2c0d..16a17e94 100644 --- a/frappy_psi/sea.py +++ b/frappy_psi/sea.py @@ -253,7 +253,7 @@ class SeaClient(ProxyClient, Module): if result == '1': self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode()) else: - self.DISPATCHER.shutdown() + self.secNode.srv.shutdown() try: reply = self.asynio.readline() if reply is None: @@ -314,7 +314,7 @@ class SeaClient(ProxyClient, Module): if path == '/device/changetime': recheck = time.time() + 1 elif path.startswith('/device/frappy_%s' % self.service) and value == '': - self.DISPATCHER.shutdown() + self.secNode.srv.shutdown() else: for module, param in mplist: oldv, oldt, oldr = self.cache.get((module, param), [None, None, None]) @@ -657,7 +657,7 @@ class SeaModule(Module): readerror = secop_error(e) pobj.readerror = readerror if pobj.export: - self.DISPATCHER.broadcast_event(make_update(self.name, pobj)) + self.secNode.srv.dispatcher.broadcast_event(make_update(self.name, pobj)) def initModule(self): self.io.register_obj(self, self.sea_object) diff --git a/test/test_attach.py b/test/test_attach.py index 2382dd34..b57f05a0 100644 --- a/test/test_attach.py +++ b/test/test_attach.py @@ -56,11 +56,23 @@ class LoggerStub: logger = LoggerStub() +class SecNodeStub: + def __init__(self): + self.modules = {} + + def add_module(self, module, modname): + self.modules[modname] = module + + def get_module(self, modname): + return self.modules[modname] + + class ServerStub: restart = None shutdown = None def __init__(self): + self.secnode = SecNodeStub() self.dispatcher = Dispatcher('dispatcher', logger, {}, self) @@ -72,6 +84,6 @@ def test_attach(): a = Module('a', logger, {'description': ''}, srv) m = Mod('m', logger, {'description': '', 'att': 'a'}, srv) assert m.propertyValues['att'] == 'a' - srv.dispatcher.register_module(a, 'a') - srv.dispatcher.register_module(m, 'm') + srv.secnode.add_module(a, 'a') + srv.secnode.add_module(m, 'm') assert m.att == a diff --git a/test/test_handler.py b/test/test_handler.py index 4680089c..26d87ba0 100644 --- a/test/test_handler.py +++ b/test/test_handler.py @@ -57,6 +57,7 @@ logger = LoggerStub() class ServerStub: def __init__(self, updates): self.dispatcher = DispatcherStub(updates) + self.secnode = None class ModuleTest(Module): diff --git a/test/test_logging.py b/test/test_logging.py index 8b9dd88e..1136e80c 100644 --- a/test/test_logging.py +++ b/test/test_logging.py @@ -28,11 +28,24 @@ import frappy.logging from frappy.logging import logger, generalConfig, HasComlog +class SecNodeStub: + def __init__(self): + self.modules = {} + self.name = "" + + def add_module(self, module, modname): + self.modules[modname] = module + + def get_module(self, modname): + return self.modules[modname] + + class ServerStub: restart = None shutdown = None def __init__(self): + self.secnode = SecNodeStub() self.dispatcher = Dispatcher('', logger.log.getChild('dispatcher'), {}, self) @@ -97,7 +110,7 @@ def init_(monkeypatch): def __init__(self, name, srv, **kwds): kwds['description'] = '' super().__init__(name or 'mod', logger.log.getChild(name), kwds, srv) - srv.dispatcher.register_module(self, name, name) + srv.secnode.add_module(self, name) self.result[:] = [] def earlyInit(self): diff --git a/test/test_modules.py b/test/test_modules.py index 19c17b23..f636cf44 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -64,6 +64,7 @@ logger = LoggerStub() class ServerStub: def __init__(self, updates): self.dispatcher = DispatcherStub(updates) + self.secnode = None class DummyMultiEvent(threading.Event): @@ -711,6 +712,7 @@ def test_super_call(): class ServerStub1: def __init__(self, updates): self.dispatcher = DispatcherStub1(updates) + self.secnode = None updates = [] srv = ServerStub1(updates) diff --git a/test/test_persistent.py b/test/test_persistent.py index de7a22dc..be931cd6 100644 --- a/test/test_persistent.py +++ b/test/test_persistent.py @@ -29,6 +29,10 @@ from frappy.lib import generalConfig from frappy.persistent import PersistentParam, PersistentMixin +class SecNodeStub: + pass + + class DispatcherStub: def announce_update(self, modulename, pname, pobj): pass @@ -47,7 +51,8 @@ logger = LoggerStub() class ServerStub: def __init__(self, equipment_id): self.dispatcher = DispatcherStub() - self.dispatcher.equipment_id = equipment_id + self.secnode = SecNodeStub() + self.secnode.equipment_id = equipment_id class Mod(PersistentMixin, Module): diff --git a/test/test_poller.py b/test/test_poller.py index 8b294439..15f8ac31 100644 --- a/test/test_poller.py +++ b/test/test_poller.py @@ -68,6 +68,7 @@ class ServerStub: def __init__(self): generalConfig.testinit() self.dispatcher = DispatcherStub() + self.secnode = None class Base(Module): diff --git a/test/test_statemachine.py b/test/test_statemachine.py index a3cba310..876480c7 100644 --- a/test/test_statemachine.py +++ b/test/test_statemachine.py @@ -205,6 +205,7 @@ class DispatcherStub: class ServerStub: def __init__(self, updates): self.dispatcher = DispatcherStub(updates) + self.secnode = None class Mod(HasStates, Drivable):