From 107e1d84b03a47bbf54be0dc67dd6f0dad226908 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Mon, 24 Feb 2020 09:09:53 +0100 Subject: [PATCH] add multiplexer/router in case a remote is disconnected and can be reconnected quickly, a seamless routing is possible. However, when after reconnection the description has changed, the server is restarted. + more information in error messages from dispatcher Change-Id: I0837e0254aee3d12a26481f6fd697081a53aabba Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/22506 Reviewed-by: Markus Zolliker Tested-by: JenkinsCodeReview --- cfg/multiplexer.cfg | 7 ++ cfg/router.cfg | 7 ++ secop/lib/multievent.py | 83 ++++++++++++++ secop/protocol/dispatcher.py | 37 ++++--- secop/protocol/interface/tcp.py | 1 + secop/protocol/router.py | 187 ++++++++++++++++++++++++++++++++ secop/server.py | 58 +++++----- 7 files changed, 337 insertions(+), 43 deletions(-) create mode 100644 cfg/multiplexer.cfg create mode 100644 cfg/router.cfg create mode 100644 secop/lib/multievent.py create mode 100644 secop/protocol/router.py diff --git a/cfg/multiplexer.cfg b/cfg/multiplexer.cfg new file mode 100644 index 0000000..e06dc62 --- /dev/null +++ b/cfg/multiplexer.cfg @@ -0,0 +1,7 @@ +[node router] +type = router +description = router node +nodes = ['localhost:5000', 'localhost:10769'] + +[interface tcp] +bindto = 0.0.0.0:5001 diff --git a/cfg/router.cfg b/cfg/router.cfg new file mode 100644 index 0000000..b0e182c --- /dev/null +++ b/cfg/router.cfg @@ -0,0 +1,7 @@ +[node router] +type = router +description = router node +node = localhost:5000 + +[interface tcp] +bindto = 0.0.0.0:5001 diff --git a/secop/lib/multievent.py b/secop/lib/multievent.py new file mode 100644 index 0000000..6b3337e --- /dev/null +++ b/secop/lib/multievent.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** + +import threading + + +class MultiEvent(threading.Event): + """Class implementing multi event objects. + + meth:`new` creates Event like objects + meth:'wait` waits for all of them being set + """ + + class SingleEvent: + """Single Event + + remark: :meth:`wait` is not implemented on purpose + """ + def __init__(self, multievent): + self.multievent = multievent + self.multievent._clear(self) + + def clear(self): + self.multievent._clear(self) + + def set(self): + self.multievent._set(self) + + def is_set(self): + return self in self.multievent.events + + def __init__(self): + self.events = set() + self._lock = threading.Lock() + super().__init__() + + def new(self): + """create a new SingleEvent""" + return self.SingleEvent(self) + + def set(self): + raise ValueError('a multievent must not be set directly') + + def clear(self): + raise ValueError('a multievent must not be cleared directly') + + def _set(self, event): + """internal: remove event from the event list""" + with self._lock: + self.events.discard(event) + if self.events: + return + super().set() + + def _clear(self, event): + """internal: add event to the event list""" + with self._lock: + self.events.add(event) + super().clear() + + def wait(self, timeout=None): + if not self.events: # do not wait if events are empty + return + super().wait(timeout) diff --git a/secop/protocol/dispatcher.py b/secop/protocol/dispatcher.py index 93cecda..4a0b9b3 100644 --- a/secop/protocol/dispatcher.py +++ b/secop/protocol/dispatcher.py @@ -42,9 +42,9 @@ import threading from collections import OrderedDict from time import time as currenttime -from secop.errors import SECoPServerError as InternalError from secop.errors import BadValueError, NoSuchCommandError, NoSuchModuleError, \ - NoSuchParameterError, ProtocolError, ReadOnlyError, SECoPError + NoSuchParameterError, ProtocolError, ReadOnlyError, SECoPServerError, InternalError,\ + SECoPError from secop.params import Parameter from secop.protocol.messages import COMMANDREPLY, DESCRIPTIONREPLY, \ DISABLEEVENTSREPLY, ENABLEEVENTSREPLY, ERRORPREFIX, EVENTREPLY, \ @@ -82,6 +82,7 @@ class Dispatcher: # eventname is or : self._subscriptions = {} self._lock = threading.RLock() + self.restart = srv.restart def broadcast_event(self, msg, reallyall=False): """broadcasts a msg to all active connections @@ -157,7 +158,7 @@ class Dispatcher: return self._modules[modulename] if modulename in list(self._modules.values()): return modulename - raise NoSuchModuleError('Module does not exist on this SEC-Node!') + raise NoSuchModuleError('Module %r does not exist on this SEC-Node!' % modulename) def remove_module(self, modulename_or_obj): moduleobj = self.get_module(modulename_or_obj) @@ -209,17 +210,17 @@ class Dispatcher: def _execute_command(self, modulename, exportedname, argument=None): moduleobj = self.get_module(modulename) if moduleobj is None: - raise NoSuchModuleError('Module does not exist on this SEC-Node!') + raise NoSuchModuleError('Module %r does not exist' % modulename) cmdname = moduleobj.commands.exported.get(exportedname, None) if cmdname is None: - raise NoSuchCommandError('Module has no command %r on this SEC-Node!' % exportedname) + raise NoSuchCommandError('Module %r has no command %r' % (modulename, exportedname)) cmdspec = moduleobj.commands[cmdname] if argument is None and cmdspec.datatype.argument is not None: - raise BadValueError('Command needs an argument!') + raise BadValueError("Command '%s:%s' needs an argument" % (modulename, cmdname)) if argument is not None and cmdspec.datatype.argument is None: - raise BadValueError('Command takes no argument!') + raise BadValueError("Command '%s:%s' takes no argument" % (modulename, cmdname)) if cmdspec.datatype.argument: # validate! @@ -239,16 +240,18 @@ class Dispatcher: def _setParameterValue(self, modulename, exportedname, value): moduleobj = self.get_module(modulename) if moduleobj is None: - raise NoSuchModuleError('Module does not exist on this SEC-Node!') + raise NoSuchModuleError('Module %r does not exist' % modulename) pname = moduleobj.parameters.exported.get(exportedname, None) if pname is None: - raise NoSuchParameterError('Module has no parameter %r on this SEC-Node!' % exportedname) + raise NoSuchParameterError('Module %r has no parameter %r' % (modulename, exportedname)) pobj = moduleobj.parameters[pname] if pobj.constant is not None: - raise ReadOnlyError('This parameter is constant and can not be accessed remotely.') + raise ReadOnlyError("Parameter %s:%s is constant and can not be changed remotely" + % (modulename, pname)) if pobj.readonly: - raise ReadOnlyError('This parameter can not be changed remotely.') + raise ReadOnlyError("Parameter %s:%s can not be changed remotely" + % (modulename, pname)) # validate! value = pobj.datatype(value) @@ -264,11 +267,11 @@ class Dispatcher: def _getParameterValue(self, modulename, exportedname): moduleobj = self.get_module(modulename) if moduleobj is None: - raise NoSuchModuleError('Module does not exist on this SEC-Node!') + raise NoSuchModuleError('Module %r does not exist' % modulename) pname = moduleobj.parameters.exported.get(exportedname, None) if pname is None: - raise NoSuchParameterError('Module has no parameter %r on this SEC-Node!' % exportedname) + raise NoSuchParameterError('Module %r has no parameter %r' % (modulename, exportedname)) pobj = moduleobj.parameters[pname] if pobj.constant is not None: # really needed? we could just construct a readreply instead.... @@ -306,7 +309,7 @@ class Dispatcher: if handler: return handler(conn, specifier, data) - raise InternalError('unhandled message!') + raise SECoPServerError('unhandled message: %s' % repr(msg)) # now the (defined) handlers for the different requests def handle_help(self, conn, specifier, data): @@ -328,7 +331,7 @@ class Dispatcher: return (READREPLY, specifier, list(self._getParameterValue(modulename, pname))) def handle_change(self, conn, specifier, data): - modulename, pname = specifier, 'value' + modulename, pname = specifier, 'target' if ':' in specifier: modulename, pname = specifier.split(':', 1) return (WRITEREPLY, specifier, list(self._setParameterValue(modulename, pname, data))) @@ -352,13 +355,13 @@ class Dispatcher: if ':' in specifier: modulename, exportedname = specifier.split(':', 1) if modulename not in self._export: - raise NoSuchModuleError('Module does not exist on this SEC-Node!') + raise NoSuchModuleError('Module %r does not exist' % modulename) moduleobj = self.get_module(modulename) if exportedname is not None: pname = moduleobj.accessiblename2attr.get(exportedname, True) if pname and pname not in moduleobj.accessibles: # what if we try to subscribe a command here ??? - raise NoSuchParameterError('Module has no such parameter on this SEC-Node!') + raise NoSuchParameterError('Module %r has no parameter %r' % (modulename, pname)) modules = [(modulename, pname)] else: modules = [(modulename, None)] diff --git a/secop/protocol/interface/tcp.py b/secop/protocol/interface/tcp.py index 615225d..6e9463b 100644 --- a/secop/protocol/interface/tcp.py +++ b/secop/protocol/interface/tcp.py @@ -221,6 +221,7 @@ class TCPServer(HasProperties, socketserver.ThreadingTCPServer): self.setProperty('detailed_errors', detailed_errors) self.checkProperties() + self.allow_reuse_address = True self.log.info("TCPServer %s binding to %s:%d" % (name, self.bindto, self.bindport)) socketserver.ThreadingTCPServer.__init__( self, (self.bindto, self.bindport), TCPRequestHandler, bind_and_activate=True) diff --git a/secop/protocol/router.py b/secop/protocol/router.py new file mode 100644 index 0000000..019fc20 --- /dev/null +++ b/secop/protocol/router.py @@ -0,0 +1,187 @@ +# -*- coding: utf-8 -*- +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** +"""Secop Router + +this is a replacement for the standard dispatcher, with the +additional functionality of routing message from/to several other SEC nodes + +simplifications: +- module wise activation not supported +- on connection, the description from all nodes are cached and all nodes are activated +- on 'describe' and on 'activate', cached values are returned +- ping is not forwarded +- what to do on a change of descriptive data is not yet implemented +""" + +import time + +import secop.protocol.dispatcher +import secop.errors +from secop.protocol.messages import DESCRIPTIONREPLY, ENABLEEVENTSREPLY, ERRORPREFIX, EVENTREPLY,\ + READREQUEST, WRITEREQUEST, COMMANDREQUEST +import secop.client +from secop.lib.multievent import MultiEvent + + +class SecopClient(secop.client.SecopClient): + DISCONNECTED = ('Communication failed', 'remote SEC node disconnected') + + def __init__(self, uri, log, dispatcher): + self.dispatcher = dispatcher + super().__init__(uri, log) + + def internalize_name(self, name): + """do not modify names""" + return name + + def updateEvent(self, module, parameter, value, timestamp, readerror): + specifier = '%s:%s' % (module, parameter) + if readerror: + msg = ERRORPREFIX + EVENTREPLY, specifier, (readerror.name, str(readerror), dict(t=timestamp)) + else: + msg = EVENTREPLY, specifier, (value, dict(t=timestamp)) + self.dispatcher.broadcast_event(msg) + + def nodeStateChange(self, online, state): + t = time.time() + if not online: + for key, (value, _, readerror) in self.cache.items(): + if not readerror: + self.cache[key] = value, t, self.DISCONNECTED + self.updateEvent(*key, *self.cache[key]) + + def descriptiveDataChange(self, module, data): + print('CHANGE', self.nodename) + self.dispatcher.restart() + #if module is None: + # self.log.error('descriptive data for node %r has changed', self.nodename) + + +class Router(secop.protocol.dispatcher.Dispatcher): + singlenode = None + + def __init__(self, name, logger, options, srv): + """initialize router + + Use the option node = for a single node or + nodes = ["", "" ...] for multiple nodes. + If a single node is given, the node properties are forwarded transparently, + else the description property is a merge from all client node properties. + """ + uri = options.pop('node', None) + uris = options.pop('nodes', None) + if uri and uris: + raise secop.errors.ConfigError('can not specify node _and_ nodes') + super().__init__(name, logger, options, srv) + if uri: + self.nodes = [SecopClient(uri, logger.getChild('routed'), self)] + self.singlenode = self.nodes[0] + else: + self.nodes = [SecopClient(uri, logger.getChild('routed%d' % i), self) for i, uri in enumerate(uris)] + # register callbacks + for node in self.nodes: + node.register(None, node) + self.node_by_module = {} + multievent = MultiEvent() + for node in self.nodes: + node.spawn_connect(multievent.new().set) + multievent.wait(10) # wait for all nodes started + nodes = [] + for node in self.nodes: + if node.online: + for module in node.modules: + self.node_by_module[module] = node + nodes.append(node) + else: + + def check_new_node(online, state, self=self, node=node): + if online: + for module in node.modules: + self.node_by_module[module] = node + self.nodes.append(node) + self.restart() + return secop.client.UNREGISTER + return None + + node.register(None, nodeStateChange=check_new_node) + logger.warning('can not connect to node %r', node.nodename) + + def handle_describe(self, conn, specifier, data): + if self.singlenode: + return DESCRIPTIONREPLY, specifier, self.singlenode.descriptive_data + reply = super().handle_describe(conn, specifier, data) + result = reply[2] + allmodules = result.get('modules', {}) + node_description = [result['description']] + for node in self.nodes: + data = node.descriptive_data.copy() + modules = data.pop('modules') + equipment_id = data.pop('equipment_id', 'unknown') + node_description.append('--- %s ---\n%s' % (equipment_id, data.pop('description', ''))) + node_description.append('\n'.join('%s: %r' % kv for kv in data.items())) + for modname, moddesc in modules.items(): + if modname in allmodules: + self.log.info('module %r is already present', modname) + else: + allmodules[modname] = moddesc + result['modules'] = allmodules + result['description'] = '\n\n'.join(node_description) + return DESCRIPTIONREPLY, specifier, result + + def handle_activate(self, conn, specifier, data): + super().handle_activate(conn, specifier, data) + for node in self.nodes: + for (module, parameter), (value, t, readerror) in node.cache.items(): + spec = '%s:%s' % (module, parameter) + if readerror: + reply = ERRORPREFIX + EVENTREPLY, spec, (readerror.name, str(readerror), dict(t=t)) + else: + datatype = node.modules[module]['parameters'][parameter]['datatype'] + reply = EVENTREPLY, spec, [datatype.export_value(value), dict(t=t)] + self.broadcast_event(reply) + return ENABLEEVENTSREPLY, None, None + + def handle_deactivate(self, conn, specifier, data): + if specifier: + raise secop.errors.NotImplementedError('module wise activation not implemented') + super().handle_deactivate(conn, specifier, data) + + def handle_read(self, conn, specifier, data): + module = specifier.split(':')[0] + if module in self._modules: + return super().handle_read(conn, specifier, data) + node = self.node_by_module[module] + if node.online: + return node.request(READREQUEST, specifier, data) + return ERRORPREFIX + READREQUEST, specifier, SecopClient.DISCONNECTED + (dict(t=node.disconnect_time),) + + def handle_change(self, conn, specifier, data): + module = specifier.split(':')[0] + if module in self._modules: + return super().handle_change(conn, specifier, data) + return self.node_by_module[module].request(WRITEREQUEST, specifier, data) + + def handle_do(self, conn, specifier, data): + module = specifier.split(':')[0] + if module in self._modules: + return super().handle_do(conn, specifier, data) + return self.node_by_module[module].request(COMMANDREQUEST, specifier, data) diff --git a/secop/server.py b/secop/server.py index 38e1878..1bf8b59 100644 --- a/secop/server.py +++ b/secop/server.py @@ -40,7 +40,7 @@ except ImportError: DaemonContext = None from secop.errors import ConfigError -from secop.lib import formatException, get_class, getGeneralConfig +from secop.lib import formatException, get_class, getGeneralConfig, mkthread from secop.modules import Attached @@ -53,10 +53,12 @@ class Server: # IMPORTANT: keep he order! (node MUST be first, as the others are referencing it!) CFGSECTIONS = [ # section_prefix, default type, mapping of selectable classes - ('node', None, {None: "protocol.dispatcher.Dispatcher"}), + ('node', 'std', {'std': "protocol.dispatcher.Dispatcher", + 'router': 'protocol.router.Router'}), ('module', None, None), ('interface', "tcp", {"tcp": "protocol.interface.tcp.TCPServer"}), ] + _restart = True def __init__(self, name, parent_logger=None): cfg = getGeneralConfig() @@ -78,6 +80,7 @@ class Server: self._dispatcher = None self._interface = None + self._restart_event = threading.Event() def start(self): if not DaemonContext: @@ -96,28 +99,29 @@ class Server: self.run() def run(self): - try: - self._processCfg() - except Exception: - print(formatException(verbose=True)) - raise + while self._restart: + self._restart = False + try: + self._processCfg() + except Exception: + print(formatException(verbose=True)) + raise - self.log.info('startup done, handling transport messages') - self._threads = set() - for ifname, ifobj in self.interfaces.items(): - self.log.debug('starting thread for interface %r' % ifname) - t = threading.Thread(target=ifobj.serve_forever) - t.daemon = True - t.start() - self._threads.add(t) - while self._threads: - time.sleep(1) - for t in self._threads: - if not t.is_alive(): - self.log.debug('thread %r died (%d still running)' % - (t, len(self._threads))) - t.join() - self._threads.discard(t) + self.log.info('startup done, handling transport messages') + threads = [] + for ifname, ifobj in self.interfaces.items(): + self.log.debug('starting thread for interface %r' % ifname) + threads.append((ifname, mkthread(ifobj.serve_forever))) + for ifname, t in threads: + t.join() + self.log.debug('thread for %r died' % ifname) + + def restart(self): + if not self._restart: + self._restart = True + for ifobj in self.interfaces.values(): + ifobj.shutdown() + ifobj.server_close() def _processCfg(self): self.log.debug('Parse config file %s ...' % self._cfgfile) @@ -129,7 +133,7 @@ class Server: self.log.error('Couldn\'t read cfg file !') raise ConfigError('Couldn\'t read cfg file %r' % self._cfgfile) - for kind, devtype, classmapping in self.CFGSECTIONS: + for kind, default_type, classmapping in self.CFGSECTIONS: kinds = '%ss' % kind objs = OrderedDict() self.__dict__[kinds] = objs @@ -145,7 +149,7 @@ class Server: self.log.error('%s %s needs a class option!' % (kind.title(), name)) raise ConfigError('cfgfile %r: %s %s needs a class option!' % (self._cfgfile, kind.title(), name)) - type_ = opts.pop('type', devtype) + type_ = opts.pop('type', default_type) cls = classmapping.get(type_, None) if not cls: self.log.error('%s %s needs a type option (select one of %s)!' % @@ -185,7 +189,9 @@ class Server: for modname, modobj in self.modules.items(): self.log.info('registering module %r' % modname) self.dispatcher.register_module(modobj, modname, modobj.properties['export']) - modobj.pollerClass.add_to_table(poll_table, modobj) + if modobj.pollerClass is not None: + # a module might be explicitly excluded from polling by setting pollerClass to None + modobj.pollerClass.add_to_table(poll_table, modobj) # also call earlyInit on the modules modobj.earlyInit()