core: move module handling out of dispatcher

Split module handling code from the dispatcher.
The new class for managing Modules is called SecNode.

* change logging to no longer need a reference to modobj
* modules get a reference to the secnode obj instead of the
  dispatcher
* intermediate usage fixes for frappy_psi/sea

Change-Id: Ifee4bb47aa7a4508bb4a47c9a5873b7e2d5faf67
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/32249
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
This commit is contained in:
Alexander Zaft
2023-10-04 09:27:15 +02:00
parent 8d26ab4fe2
commit 757e96e7c0
16 changed files with 373 additions and 288 deletions

View File

@ -62,7 +62,7 @@ class HasIO(Module):
io = self.ioClass(ioname, srv.log.getChild(ioname), opts, srv) # pylint: disable=not-callable
io.callingModule = []
srv.modules[ioname] = io
srv.dispatcher.register_module(io, ioname)
srv.secnode.add_module(io, ioname)
self.ioDict[self.uri] = ioname
self.io = ioname

View File

@ -54,6 +54,8 @@ class RemoteLogHandler(mlzlog.Handler):
def __init__(self):
super().__init__()
self.subscriptions = {} # dict[modname] of tuple(mobobj, dict [conn] of level)
# None will be replaced by a callback when one is first installed
self.send_log = None
def emit(self, record):
"""unused"""
@ -61,18 +63,18 @@ class RemoteLogHandler(mlzlog.Handler):
def handle(self, record):
modname = record.name.split('.')[-1]
try:
modobj, subscriptions = self.subscriptions[modname]
subscriptions = self.subscriptions[modname]
except KeyError:
return
for conn, lev in subscriptions.items():
if record.levelno >= lev:
modobj.DISPATCHER.send_log_msg(
conn, modobj.name, LEVEL_NAMES[record.levelno],
self.send_log( # pylint: disable=not-callable
conn, modname, LEVEL_NAMES[record.levelno],
record.getMessage())
def set_conn_level(self, modobj, conn, level):
def set_conn_level(self, modname, conn, level):
level = check_level(level)
modobj, subscriptions = self.subscriptions.setdefault(modobj.name, (modobj, {}))
subscriptions = self.subscriptions.setdefault(modname, {})
if level == OFF:
subscriptions.pop(conn, None)
else:
@ -126,7 +128,7 @@ class HasComlog:
if self.comlog and generalConfig.initialized and generalConfig.comlog:
self._comLog = mlzlog.Logger(f'COMLOG.{self.name}')
self._comLog.handlers[:] = []
directory = join(logger.logdir, logger.rootname, 'comlog', self.DISPATCHER.name)
directory = join(logger.logdir, logger.rootname, 'comlog', self.secNode.name)
self._comLog.addHandler(ComLogfileHandler(
directory, self.name, max_days=generalConfig.getint('comlog_days', 7)))
return

View File

@ -327,14 +327,13 @@ class Module(HasAccessibles):
NoneOr(FloatRange(0)), export=False, default=None)
enablePoll = True
# reference to the dispatcher (used for sending async updates)
DISPATCHER = None
pollInfo = None
triggerPoll = None # trigger event for polls. used on io modules and modules without io
def __init__(self, name, logger, cfgdict, srv):
# remember the dispatcher object (for the async callbacks)
self.DISPATCHER = srv.dispatcher
# remember the secnode for interacting with other modules and the
# server
self.secNode = srv.secnode
self.log = logger
self.name = name
self.valueCallbacks = {}
@ -349,6 +348,7 @@ class Module(HasAccessibles):
self.attachedModules = {}
self.errors = []
self._isinitialized = False
self.updateCallback = srv.dispatcher.announce_update
# handle module properties
# 1) make local copies of properties
@ -549,7 +549,7 @@ class Module(HasAccessibles):
arg = value
pobj.readerror = None
if pobj.export:
self.DISPATCHER.announce_update(self.name, pname, pobj)
self.updateCallback(self.name, pname, pobj)
cblist = callbacks[pname]
for cb in cblist:
try:
@ -818,15 +818,16 @@ class Module(HasAccessibles):
except Exception:
self.log.error(formatException())
def setRemoteLogging(self, conn, level):
def setRemoteLogging(self, conn, level, send_log):
if self.remoteLogHandler is None:
for handler in self.log.handlers:
if isinstance(handler, RemoteLogHandler):
handler.send_log = send_log
self.remoteLogHandler = handler
break
else:
raise ValueError('remote handler not found')
self.remoteLogHandler.set_conn_level(self, conn, level)
self.remoteLogHandler.set_conn_level(self.name, conn, level)
def checkLimits(self, value, pname='target'):
"""check for limits

View File

@ -132,7 +132,7 @@ class Attached(Property):
modulename = super().__get__(obj, owner)
if not modulename:
return None # happens when mandatory=False and modulename is not given
modobj = obj.DISPATCHER.get_module(modulename)
modobj = obj.secNode.get_module(modulename)
if not isinstance(modobj, self.basecls):
raise ConfigError(f'attached module {self.name}={modobj.name!r} '
f'must inherit from {self.basecls.__qualname__!r}')

View File

@ -77,7 +77,7 @@ class PersistentMixin(Module):
super().__init__(name, logger, cfgdict, srv)
persistentdir = os.path.join(generalConfig.logdir, 'persistent')
os.makedirs(persistentdir, exist_ok=True)
self.persistentFile = os.path.join(persistentdir, f'{self.DISPATCHER.equipment_id}.{self.name}.json')
self.persistentFile = os.path.join(persistentdir, f'{self.secNode.equipment_id}.{self.name}.json')
self.initData = {} # "factory" settings
loaded = self.loadPersistentData()
for pname, pobj in self.parameters.items():

View File

@ -17,6 +17,7 @@
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
# Markus Zolliker <markus.zolliker@psi.ch>
# Alexander Zaft <a.zaft@fz-juelich.de>
#
# *****************************************************************************
"""Dispatcher for SECoP Messages
@ -28,28 +29,18 @@ Interface to the service offering part:
on the connectionobj or on activated connections
- 'add_connection(connectionobj)' registers new connection
- 'remove_connection(connectionobj)' removes now longer functional connection
Interface to the modules:
- add_module(modulename, moduleobj, export=True) registers a new module under the
given name, may also register it for exporting (making accessible)
- get_module(modulename) returns the requested module or None
- remove_module(modulename_or_obj): removes the module (during shutdown)
"""
import threading
import traceback
from collections import OrderedDict
from time import time as currenttime
from frappy.errors import NoSuchCommandError, NoSuchModuleError, \
NoSuchParameterError, ProtocolError, ReadOnlyError, ConfigError
NoSuchParameterError, ProtocolError, ReadOnlyError
from frappy.params import Parameter
from frappy.protocol.messages import COMMANDREPLY, DESCRIPTIONREPLY, \
DISABLEEVENTSREPLY, ENABLEEVENTSREPLY, ERRORPREFIX, EVENTREPLY, \
HEARTBEATREPLY, IDENTREPLY, IDENTREQUEST, READREPLY, WRITEREPLY, \
LOGGING_REPLY, LOG_EVENT
from frappy.lib import get_class
HEARTBEATREPLY, IDENTREPLY, IDENTREQUEST, LOG_EVENT, LOGGING_REPLY, \
READREPLY, WRITEREPLY
def make_update(modulename, pobj):
@ -70,10 +61,7 @@ class Dispatcher:
self.nodeprops[k] = options.pop(k)
self.log = logger
# map ALL modulename -> moduleobj
self._modules = {}
# list of EXPORTED modules
self._export = []
self.secnode = srv.secnode
# list all connections
self._connections = []
# active (i.e. broadcast-receiving) connections
@ -87,11 +75,6 @@ class Dispatcher:
self.shutdown = srv.shutdown
# handle to server
self.srv = srv
# set of modules that failed creation
self.failed_modules = set()
# list of errors that occured during initialization
self.errors = []
self.traceback_counter = 0
def broadcast_event(self, msg, reallyall=False):
"""broadcasts a msg to all active connections
@ -147,163 +130,8 @@ class Dispatcher:
self._connections.remove(conn)
self.reset_connection(conn)
def register_module(self, moduleobj, modulename, export=True):
self.log.debug('registering module %r as %s (export=%r)',
moduleobj, modulename, export)
self._modules[modulename] = moduleobj
if export:
self._export.append(modulename)
def get_module(self, modulename):
""" Returns a fully initialized module. Or None, if something went
wrong during instatiating/initializing the module."""
modobj = self.get_module_instance(modulename)
if modobj is None:
return None
if modobj._isinitialized:
return modobj
# also call earlyInit on the modules
self.log.debug('initializing module %r', modulename)
try:
modobj.earlyInit()
if not modobj.earlyInitDone:
self.errors.append(f'{modobj.earlyInit.__qualname__} was not called, probably missing super call')
modobj.initModule()
if not modobj.initModuleDone:
self.errors.append(f'{modobj.initModule.__qualname__} was not called, probably missing super call')
except Exception as e:
if self.traceback_counter == 0:
self.log.exception(traceback.format_exc())
self.traceback_counter += 1
self.errors.append(f'error initializing {modulename}: {e!r}')
modobj._isinitialized = True
self.log.debug('initialized module %r', modulename)
return modobj
def get_module_instance(self, modulename):
""" Returns the module in its current initialization state or creates a
new uninitialized modle to return.
When creating a new module, srv.module_config is accessed to get the
modules configuration.
"""
if modulename in self._modules:
return self._modules[modulename]
if modulename in list(self._modules.values()):
# it's actually already the module object
return modulename
# create module from srv.module_cfg, store and return
self.log.debug('attempting to create module %r', modulename)
opts = self.srv.module_cfg.get(modulename, None)
if opts is None:
raise NoSuchModuleError(f'Module {modulename!r} does not exist on this SEC-Node!')
pymodule = None
try: # pylint: disable=no-else-return
classname = opts.pop('cls')
if isinstance(classname, str):
pymodule = classname.rpartition('.')[0]
if pymodule in self.failed_modules:
# creation has failed already once, do not try again
return None
cls = get_class(classname)
else:
pymodule = classname.__module__
if pymodule in self.failed_modules:
# creation has failed already once, do not try again
return None
cls = classname
except Exception as e:
if str(e) == 'no such class':
self.errors.append(f'{classname} not found')
else:
self.failed_modules.add(pymodule)
if self.traceback_counter == 0:
self.log.exception(traceback.format_exc())
self.traceback_counter += 1
self.errors.append(f'error importing {classname}')
return None
else:
try:
modobj = cls(modulename, self.log.parent.getChild(modulename), opts, self.srv)
except ConfigError as e:
self.errors.append(f'error creating module {modulename}:')
for errtxt in e.args[0] if isinstance(e.args[0], list) else [e.args[0]]:
self.errors.append(' ' + errtxt)
modobj = None
except Exception as e:
if self.traceback_counter == 0:
self.log.exception(traceback.format_exc())
self.traceback_counter += 1
self.errors.append(f'error creating {modulename}')
modobj = None
if modobj:
self.register_module(modobj, modulename, modobj.export)
self.srv.modules[modulename] = modobj # IS HERE THE CORRECT PLACE?
return modobj
def remove_module(self, modulename_or_obj):
moduleobj = self.get_module(modulename_or_obj)
modulename = moduleobj.name
if modulename in self._export:
self._export.remove(modulename)
self._modules.pop(modulename)
self._subscriptions.pop(modulename, None)
for k in [kk for kk in self._subscriptions if kk.startswith(f'{modulename}:')]:
self._subscriptions.pop(k, None)
def list_module_names(self):
# return a copy of our list
return self._export[:]
def export_accessibles(self, modulename):
self.log.debug('export_accessibles(%r)', modulename)
if modulename in self._export:
# omit export=False params!
res = OrderedDict()
for aobj in self.get_module(modulename).accessibles.values():
if aobj.export:
res[aobj.export] = aobj.for_export()
self.log.debug('list accessibles for module %s -> %r',
modulename, res)
return res
self.log.debug('-> module is not to be exported!')
return OrderedDict()
def get_descriptive_data(self, specifier):
"""returns a python object which upon serialisation results in the descriptive data"""
specifier = specifier or ''
modules = {}
result = {'modules': modules}
for modulename in self._export:
module = self.get_module(modulename)
if not module.export:
continue
# some of these need rework !
mod_desc = {'accessibles': self.export_accessibles(modulename)}
mod_desc.update(module.exportProperties())
mod_desc.pop('export', False)
modules[modulename] = mod_desc
modname, _, pname = specifier.partition(':')
if modname in modules: # extension to SECoP standard: description of a single module
result = modules[modname]
if pname in result['accessibles']: # extension to SECoP standard: description of a single accessible
# command is also accepted
result = result['accessibles'][pname]
elif pname:
raise NoSuchParameterError(f'Module {modname!r} has no parameter {pname!r}')
elif not modname or modname == '.':
result['equipment_id'] = self.equipment_id
result['firmware'] = 'FRAPPY - The Python Framework for SECoP'
result['version'] = '2021.02'
result.update(self.nodeprops)
else:
raise NoSuchModuleError(f'Module {modname!r} does not exist')
return result
def _execute_command(self, modulename, exportedname, argument=None):
moduleobj = self.get_module(modulename)
moduleobj = self.secnode.get_module(modulename)
if moduleobj is None:
raise NoSuchModuleError(f'Module {modulename!r} does not exist')
@ -322,7 +150,7 @@ class Dispatcher:
return result, {'t': currenttime()}
def _setParameterValue(self, modulename, exportedname, value):
moduleobj = self.get_module(modulename)
moduleobj = self.secnode.get_module(modulename)
if moduleobj is None:
raise NoSuchModuleError(f'Module {modulename!r} does not exist')
@ -343,7 +171,7 @@ class Dispatcher:
return pobj.export_value(), {'t': pobj.timestamp} if pobj.timestamp else {}
def _getParameterValue(self, modulename, exportedname):
moduleobj = self.get_module(modulename)
moduleobj = self.secnode.get_module(modulename)
if moduleobj is None:
raise NoSuchModuleError(f'Module {modulename!r} does not exist')
@ -400,7 +228,7 @@ class Dispatcher:
return (IDENTREPLY, None, None)
def handle_describe(self, conn, specifier, data):
return (DESCRIPTIONREPLY, specifier or '.', self.get_descriptive_data(specifier))
return (DESCRIPTIONREPLY, specifier or '.', self.secnode.get_descriptive_data(specifier))
def handle_read(self, conn, specifier, data):
if data:
@ -439,9 +267,9 @@ class Dispatcher:
modulename, exportedname = specifier, None
if ':' in specifier:
modulename, exportedname = specifier.split(':', 1)
if modulename not in self._export:
if modulename not in self.secnode.export:
raise NoSuchModuleError(f'Module {modulename!r} does not exist')
moduleobj = self.get_module(modulename)
moduleobj = self.secnode.get_module(modulename)
if exportedname is not None:
pname = moduleobj.accessiblename2attr.get(exportedname, True)
if pname and pname not in moduleobj.accessibles:
@ -455,12 +283,12 @@ class Dispatcher:
else:
# activate all modules
self._active_connections.add(conn)
modules = [(m, None) for m in self._export]
modules = [(m, None) for m in self.secnode.export]
# send updates for all subscribed values.
# note: The initial poll already happend before the server is active
for modulename, pname in modules:
moduleobj = self._modules.get(modulename, None)
moduleobj = self.secnode.modules.get(modulename, None)
if pname:
conn.send_reply(make_update(modulename, moduleobj.parameters[pname]))
continue
@ -484,13 +312,13 @@ class Dispatcher:
conn.send_reply((LOG_EVENT, f'{modname}:{level}', msg))
def set_all_log_levels(self, conn, level):
for modobj in self._modules.values():
modobj.setRemoteLogging(conn, level)
for modobj in self.secnode.modules.values():
modobj.setRemoteLogging(conn, level, self.send_log_msg)
def handle_logging(self, conn, specifier, level):
if specifier and specifier != '.':
modobj = self._modules[specifier]
modobj.setRemoteLogging(conn, level)
modobj = self.secnode.modules[specifier]
modobj.setRemoteLogging(conn, level, self.send_log_msg)
else:
self.set_all_log_levels(conn, level)
return LOGGING_REPLY, specifier, level

281
frappy/secnode.py Normal file
View File

@ -0,0 +1,281 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Alexander Zaft <a.zaft@fz-juelich.de>
#
# *****************************************************************************
import traceback
from collections import OrderedDict
from frappy.dynamic import Pinata
from frappy.errors import ConfigError, NoSuchModuleError, NoSuchParameterError
from frappy.lib import get_class
class SecNode:
"""Managing the modules.
Interface to the modules:
- add_module(module, modulename)
- get_module(modulename) returns the requested module or None if there is
no suitable configuration on the server
"""
def __init__(self, name, logger, options, srv):
self.equipment_id = options.pop('equipment_id', name)
self.nodeprops = {}
for k in list(options):
self.nodeprops[k] = options.pop(k)
# map ALL modulename -> moduleobj
self.modules = {}
# list of EXPORTED modules
self.export = []
self.log = logger
self.srv = srv
# set of modules that failed creation
self.failed_modules = set()
# list of errors that occured during initialization
self.errors = []
self.traceback_counter = 0
self.name = name
def get_module(self, modulename):
""" Returns a fully initialized module. Or None, if something went
wrong during instatiating/initializing the module."""
modobj = self.get_module_instance(modulename)
if modobj is None:
return None
if modobj._isinitialized:
return modobj
# also call earlyInit on the modules
self.log.debug('initializing module %r', modulename)
try:
modobj.earlyInit()
if not modobj.earlyInitDone:
self.errors.append(f'{modobj.earlyInit.__qualname__} was not '
f'called, probably missing super call')
modobj.initModule()
if not modobj.initModuleDone:
self.errors.append(f'{modobj.initModule.__qualname__} was not '
f'called, probably missing super call')
except Exception as e:
if self.traceback_counter == 0:
self.log.exception(traceback.format_exc())
self.traceback_counter += 1
self.errors.append(f'error initializing {modulename}: {e!r}')
modobj._isinitialized = True
self.log.debug('initialized module %r', modulename)
return modobj
def get_module_instance(self, modulename):
""" Returns the module in its current initialization state or creates a
new uninitialized modle to return.
When creating a new module, srv.module_config is accessed to get the
modules configuration.
"""
if modulename in self.modules:
return self.modules[modulename]
if modulename in list(self.modules.values()):
# it's actually already the module object
return modulename
# create module from srv.module_cfg, store and return
self.log.debug('attempting to create module %r', modulename)
opts = self.srv.module_cfg.get(modulename, None)
if opts is None:
raise NoSuchModuleError(f'Module {modulename!r} does not exist on '
f'this SEC-Node!')
pymodule = None
try: # pylint: disable=no-else-return
classname = opts.pop('cls')
if isinstance(classname, str):
pymodule = classname.rpartition('.')[0]
if pymodule in self.failed_modules:
# creation has failed already once, do not try again
return None
cls = get_class(classname)
else:
pymodule = classname.__module__
if pymodule in self.failed_modules:
# creation has failed already once, do not try again
return None
cls = classname
except Exception as e:
if str(e) == 'no such class':
self.errors.append(f'{classname} not found')
else:
self.failed_modules.add(pymodule)
if self.traceback_counter == 0:
self.log.exception(traceback.format_exc())
self.traceback_counter += 1
self.errors.append(f'error importing {classname}')
return None
else:
try:
modobj = cls(modulename, self.log.parent.getChild(modulename),
opts, self.srv)
except ConfigError as e:
self.errors.append(f'error creating module {modulename}:')
for errtxt in e.args[0] if isinstance(e.args[0], list) else [e.args[0]]:
self.errors.append(' ' + errtxt)
modobj = None
except Exception as e:
if self.traceback_counter == 0:
self.log.exception(traceback.format_exc())
self.traceback_counter += 1
self.errors.append(f'error creating {modulename}')
modobj = None
if modobj:
self.add_module(modobj, modulename)
return modobj
def create_modules(self):
self.modules = OrderedDict()
# create and initialize modules
todos = list(self.srv.module_cfg.items())
while todos:
modname, options = todos.pop(0)
if modname in self.modules:
# already created via Attached
continue
# For Pinata modules: we need to access this in Self.get_module
self.srv.module_cfg[modname] = dict(options)
modobj = self.get_module_instance(modname) # lazy
if modobj is None:
self.log.debug('Module %s returned None', modname)
continue
self.modules[modname] = modobj
if isinstance(modobj, Pinata):
# scan for dynamic devices
pinata = self.get_module(modname)
pinata_modules = list(pinata.scanModules())
for name, _cfg in pinata_modules:
if name in self.srv.module_cfg:
self.log.error('Module %s, from pinata %s, already '
'exists in config file!', name, modname)
self.log.info('Pinata %s found %d modules',
modname, len(pinata_modules))
todos.extend(pinata_modules)
def export_accessibles(self, modulename):
self.log.debug('export_accessibles(%r)', modulename)
if modulename in self.export:
# omit export=False params!
res = OrderedDict()
for aobj in self.get_module(modulename).accessibles.values():
if aobj.export:
res[aobj.export] = aobj.for_export()
self.log.debug('list accessibles for module %s -> %r',
modulename, res)
return res
self.log.debug('-> module is not to be exported!')
return OrderedDict()
def get_descriptive_data(self, specifier):
"""returns a python object which upon serialisation results in the
descriptive data"""
specifier = specifier or ''
modules = {}
result = {'modules': modules}
for modulename in self.export:
module = self.get_module(modulename)
if not module.export:
continue
# some of these need rework !
mod_desc = {'accessibles': self.export_accessibles(modulename)}
mod_desc.update(module.exportProperties())
mod_desc.pop('export', False)
modules[modulename] = mod_desc
modname, _, pname = specifier.partition(':')
if modname in modules: # extension to SECoP standard: description of a single module
result = modules[modname]
if pname in result['accessibles']: # extension to SECoP standard: description of a single accessible
# command is also accepted
result = result['accessibles'][pname]
elif pname:
raise NoSuchParameterError(f'Module {modname!r} '
f'has no parameter {pname!r}')
elif not modname or modname == '.':
result['equipment_id'] = self.equipment_id
result['firmware'] = 'FRAPPY - The Python Framework for SECoP'
result['version'] = '2021.02'
result.update(self.nodeprops)
else:
raise NoSuchModuleError(f'Module {modname!r} does not exist')
return result
def add_module(self, module, modulename):
"""Adds a named module object to this SecNode."""
self.modules[modulename] = module
if module.export:
self.export.append(modulename)
# def remove_module(self, modulename_or_obj):
# moduleobj = self.get_module(modulename_or_obj)
# modulename = moduleobj.name
# if modulename in self.export:
# self.export.remove(modulename)
# self.modules.pop(modulename)
# self._subscriptions.pop(modulename, None)
# for k in [kk for kk in self._subscriptions if kk.startswith(f'{modulename}:')]:
# self._subscriptions.pop(k, None)
def shutdown_modules(self):
"""Call 'shutdownModule' for all modules."""
for name in self._getSortedModules():
self.modules[name].shutdownModule()
def _getSortedModules(self):
"""Sort modules topologically by inverse dependency.
Example: if there is an IO device A and module B depends on it, then
the result will be [B, A].
Right now, if the dependency graph is not a DAG, we give up and return
the unvisited nodes to be dismantled at the end.
Taken from Introduction to Algorithms [CLRS].
"""
def go(name):
if name in done: # visiting a node
return True
if name in visited:
visited.add(name)
return False # cycle in dependencies -> fail
visited.add(name)
if name in unmarked:
unmarked.remove(name)
for module in self.modules[name].attachedModules.values():
res = go(module.name)
if not res:
return False
visited.remove(name)
done.add(name)
l.append(name)
return True
unmarked = set(self.modules.keys()) # unvisited nodes
visited = set() # visited in DFS, but not completed
done = set()
l = [] # list of sorted modules
while unmarked:
if not go(unmarked.pop()):
self.log.error('cyclical dependency between modules!')
return l[::-1] + list(visited) + list(unmarked)
return l[::-1]

View File

@ -25,14 +25,13 @@
import os
import signal
import sys
from collections import OrderedDict
from frappy.config import load_config
from frappy.errors import ConfigError
from frappy.dynamic import Pinata
from frappy.lib import formatException, generalConfig, get_class, mkthread
from frappy.lib.multievent import MultiEvent
from frappy.params import PREDEFINED_ACCESSIBLES
from frappy.secnode import SecNode
try:
from daemon import DaemonContext
@ -175,14 +174,13 @@ class Server:
# server_close() called by 'with'
self.log.info(f'stopped listenning, cleaning up'
f' {len(self.modules)} modules')
f' {len(self.secnode.modules)} modules')
# if systemd:
# if self._restart:
# systemd.daemon.notify('RELOADING=1')
# else:
# systemd.daemon.notify('STOPPING=1')
for name in self._getSortedModules():
self.modules[name].shutdownModule()
self.secnode.shutdown_modules()
if self._restart:
self.restart_hook()
self.log.info('restarting')
@ -209,50 +207,27 @@ class Server:
errors = []
opts = dict(self.node_cfg)
cls = get_class(opts.pop('cls'))
self.dispatcher = cls(opts.pop('name', self._cfgfiles),
self.log.getChild('dispatcher'), opts, self)
name = opts.pop('name', self._cfgfiles)
# TODO: opts not in both
self.secnode = SecNode(name, self.log.getChild('secnode'), opts, self)
self.dispatcher = cls(name, self.log.getChild('dispatcher'), opts, self)
if opts:
self.dispatcher.errors.append(self.unknown_options(cls, opts))
self.modules = OrderedDict()
# create and initialize modules
todos = list(self.module_cfg.items())
while todos:
modname, options = todos.pop(0)
if modname in self.modules:
# already created by Dispatcher (via Attached)
continue
# For Pinata modules: we need to access this in Dispatcher.get_module
self.module_cfg[modname] = dict(options)
modobj = self.dispatcher.get_module_instance(modname) # lazy
if modobj is None:
self.log.debug('Module %s returned None', modname)
continue
self.modules[modname] = modobj
if isinstance(modobj, Pinata):
# scan for dynamic devices
pinata = self.dispatcher.get_module(modname)
pinata_modules = list(pinata.scanModules())
for name, _cfg in pinata_modules:
if name in self.module_cfg:
self.log.error('Module %s, from pinata %s, already'
' exists in config file!', name, modname)
self.log.info('Pinata %s found %d modules', modname, len(pinata_modules))
todos.extend(pinata_modules)
self.secnode.errors.append(self.unknown_options(cls, opts))
self.secnode.create_modules()
# initialize all modules by getting them with Dispatcher.get_module,
# which is done in the get_descriptive data
# TODO: caching, to not make this extra work
self.dispatcher.get_descriptive_data('')
self.secnode.get_descriptive_data('')
# =========== All modules are initialized ===========
# all errors from initialization process
errors = self.dispatcher.errors
errors = self.secnode.errors
if not self._testonly:
start_events = MultiEvent(default_timeout=30)
for modname, modobj in self.modules.items():
for modname, modobj in self.secnode.modules.items():
# startModule must return either a timeout value or None (default 30 sec)
start_events.name = f'module {modname}'
modobj.startModule(start_events)
@ -279,7 +254,8 @@ class Server:
self.log.info('all modules started')
history_path = os.environ.get('FRAPPY_HISTORY')
if history_path:
from frappy_psi.historywriter import FrappyHistoryWriter # pylint: disable=import-outside-toplevel
from frappy_psi.historywriter import \
FrappyHistoryWriter # pylint: disable=import-outside-toplevel
writer = FrappyHistoryWriter(history_path, PREDEFINED_ACCESSIBLES.keys(), self.dispatcher)
# treat writer as a connection
self.dispatcher.add_connection(writer)
@ -292,41 +268,3 @@ class Server:
# history_path = os.environ.get('ALTERNATIVE_HISTORY')
# if history_path:
# from frappy_<xx>.historywriter import ... etc.
def _getSortedModules(self):
"""Sort modules topologically by inverse dependency.
Example: if there is an IO device A and module B depends on it, then
the result will be [B, A].
Right now, if the dependency graph is not a DAG, we give up and return
the unvisited nodes to be dismantled at the end.
Taken from Introduction to Algorithms [CLRS].
"""
def go(name):
if name in done: # visiting a node
return True
if name in visited:
visited.add(name)
return False # cycle in dependencies -> fail
visited.add(name)
if name in unmarked:
unmarked.remove(name)
for module in self.modules[name].attachedModules.values():
res = go(module.name)
if not res:
return False
visited.remove(name)
done.add(name)
l.append(name)
return True
unmarked = set(self.modules.keys()) # unvisited nodes
visited = set() # visited in DFS, but not completed
done = set()
l = [] # list of sorted modules
while unmarked:
if not go(unmarked.pop()):
self.log.error('cyclical dependency between modules!')
return l[::-1] + list(visited) + list(unmarked)
return l[::-1]

View File

@ -253,7 +253,7 @@ class SeaClient(ProxyClient, Module):
if result == '1':
self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode())
else:
self.DISPATCHER.shutdown()
self.secNode.srv.shutdown()
try:
reply = self.asynio.readline()
if reply is None:
@ -314,7 +314,7 @@ class SeaClient(ProxyClient, Module):
if path == '/device/changetime':
recheck = time.time() + 1
elif path.startswith('/device/frappy_%s' % self.service) and value == '':
self.DISPATCHER.shutdown()
self.secNode.srv.shutdown()
else:
for module, param in mplist:
oldv, oldt, oldr = self.cache.get((module, param), [None, None, None])
@ -657,7 +657,7 @@ class SeaModule(Module):
readerror = secop_error(e)
pobj.readerror = readerror
if pobj.export:
self.DISPATCHER.broadcast_event(make_update(self.name, pobj))
self.secNode.srv.dispatcher.broadcast_event(make_update(self.name, pobj))
def initModule(self):
self.io.register_obj(self, self.sea_object)

View File

@ -56,11 +56,23 @@ class LoggerStub:
logger = LoggerStub()
class SecNodeStub:
def __init__(self):
self.modules = {}
def add_module(self, module, modname):
self.modules[modname] = module
def get_module(self, modname):
return self.modules[modname]
class ServerStub:
restart = None
shutdown = None
def __init__(self):
self.secnode = SecNodeStub()
self.dispatcher = Dispatcher('dispatcher', logger, {}, self)
@ -72,6 +84,6 @@ def test_attach():
a = Module('a', logger, {'description': ''}, srv)
m = Mod('m', logger, {'description': '', 'att': 'a'}, srv)
assert m.propertyValues['att'] == 'a'
srv.dispatcher.register_module(a, 'a')
srv.dispatcher.register_module(m, 'm')
srv.secnode.add_module(a, 'a')
srv.secnode.add_module(m, 'm')
assert m.att == a

View File

@ -57,6 +57,7 @@ logger = LoggerStub()
class ServerStub:
def __init__(self, updates):
self.dispatcher = DispatcherStub(updates)
self.secnode = None
class ModuleTest(Module):

View File

@ -28,11 +28,24 @@ import frappy.logging
from frappy.logging import logger, generalConfig, HasComlog
class SecNodeStub:
def __init__(self):
self.modules = {}
self.name = ""
def add_module(self, module, modname):
self.modules[modname] = module
def get_module(self, modname):
return self.modules[modname]
class ServerStub:
restart = None
shutdown = None
def __init__(self):
self.secnode = SecNodeStub()
self.dispatcher = Dispatcher('', logger.log.getChild('dispatcher'), {}, self)
@ -97,7 +110,7 @@ def init_(monkeypatch):
def __init__(self, name, srv, **kwds):
kwds['description'] = ''
super().__init__(name or 'mod', logger.log.getChild(name), kwds, srv)
srv.dispatcher.register_module(self, name, name)
srv.secnode.add_module(self, name)
self.result[:] = []
def earlyInit(self):

View File

@ -64,6 +64,7 @@ logger = LoggerStub()
class ServerStub:
def __init__(self, updates):
self.dispatcher = DispatcherStub(updates)
self.secnode = None
class DummyMultiEvent(threading.Event):
@ -711,6 +712,7 @@ def test_super_call():
class ServerStub1:
def __init__(self, updates):
self.dispatcher = DispatcherStub1(updates)
self.secnode = None
updates = []
srv = ServerStub1(updates)

View File

@ -29,6 +29,10 @@ from frappy.lib import generalConfig
from frappy.persistent import PersistentParam, PersistentMixin
class SecNodeStub:
pass
class DispatcherStub:
def announce_update(self, modulename, pname, pobj):
pass
@ -47,7 +51,8 @@ logger = LoggerStub()
class ServerStub:
def __init__(self, equipment_id):
self.dispatcher = DispatcherStub()
self.dispatcher.equipment_id = equipment_id
self.secnode = SecNodeStub()
self.secnode.equipment_id = equipment_id
class Mod(PersistentMixin, Module):

View File

@ -68,6 +68,7 @@ class ServerStub:
def __init__(self):
generalConfig.testinit()
self.dispatcher = DispatcherStub()
self.secnode = None
class Base(Module):

View File

@ -205,6 +205,7 @@ class DispatcherStub:
class ServerStub:
def __init__(self, updates):
self.dispatcher = DispatcherStub(updates)
self.secnode = None
class Mod(HasStates, Drivable):