add multiplexer/router

in case a remote is disconnected and can be reconnected quickly,
a seamless routing is possible. However, when after reconnection
the description has changed, the server is restarted.

+ more information in error messages from dispatcher

Change-Id: I0837e0254aee3d12a26481f6fd697081a53aabba
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/22506
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
This commit is contained in:
2020-02-24 09:09:53 +01:00
parent 89c98fa716
commit 107e1d84b0
7 changed files with 337 additions and 43 deletions

7
cfg/multiplexer.cfg Normal file
View File

@ -0,0 +1,7 @@
[node router]
type = router
description = router node
nodes = ['localhost:5000', 'localhost:10769']
[interface tcp]
bindto = 0.0.0.0:5001

7
cfg/router.cfg Normal file
View File

@ -0,0 +1,7 @@
[node router]
type = router
description = router node
node = localhost:5000
[interface tcp]
bindto = 0.0.0.0:5001

83
secop/lib/multievent.py Normal file
View File

@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import threading
class MultiEvent(threading.Event):
"""Class implementing multi event objects.
meth:`new` creates Event like objects
meth:'wait` waits for all of them being set
"""
class SingleEvent:
"""Single Event
remark: :meth:`wait` is not implemented on purpose
"""
def __init__(self, multievent):
self.multievent = multievent
self.multievent._clear(self)
def clear(self):
self.multievent._clear(self)
def set(self):
self.multievent._set(self)
def is_set(self):
return self in self.multievent.events
def __init__(self):
self.events = set()
self._lock = threading.Lock()
super().__init__()
def new(self):
"""create a new SingleEvent"""
return self.SingleEvent(self)
def set(self):
raise ValueError('a multievent must not be set directly')
def clear(self):
raise ValueError('a multievent must not be cleared directly')
def _set(self, event):
"""internal: remove event from the event list"""
with self._lock:
self.events.discard(event)
if self.events:
return
super().set()
def _clear(self, event):
"""internal: add event to the event list"""
with self._lock:
self.events.add(event)
super().clear()
def wait(self, timeout=None):
if not self.events: # do not wait if events are empty
return
super().wait(timeout)

View File

@ -42,9 +42,9 @@ import threading
from collections import OrderedDict from collections import OrderedDict
from time import time as currenttime from time import time as currenttime
from secop.errors import SECoPServerError as InternalError
from secop.errors import BadValueError, NoSuchCommandError, NoSuchModuleError, \ from secop.errors import BadValueError, NoSuchCommandError, NoSuchModuleError, \
NoSuchParameterError, ProtocolError, ReadOnlyError, SECoPError NoSuchParameterError, ProtocolError, ReadOnlyError, SECoPServerError, InternalError,\
SECoPError
from secop.params import Parameter from secop.params import Parameter
from secop.protocol.messages import COMMANDREPLY, DESCRIPTIONREPLY, \ from secop.protocol.messages import COMMANDREPLY, DESCRIPTIONREPLY, \
DISABLEEVENTSREPLY, ENABLEEVENTSREPLY, ERRORPREFIX, EVENTREPLY, \ DISABLEEVENTSREPLY, ENABLEEVENTSREPLY, ERRORPREFIX, EVENTREPLY, \
@ -82,6 +82,7 @@ class Dispatcher:
# eventname is <modulename> or <modulename>:<parametername> # eventname is <modulename> or <modulename>:<parametername>
self._subscriptions = {} self._subscriptions = {}
self._lock = threading.RLock() self._lock = threading.RLock()
self.restart = srv.restart
def broadcast_event(self, msg, reallyall=False): def broadcast_event(self, msg, reallyall=False):
"""broadcasts a msg to all active connections """broadcasts a msg to all active connections
@ -157,7 +158,7 @@ class Dispatcher:
return self._modules[modulename] return self._modules[modulename]
if modulename in list(self._modules.values()): if modulename in list(self._modules.values()):
return modulename return modulename
raise NoSuchModuleError('Module does not exist on this SEC-Node!') raise NoSuchModuleError('Module %r does not exist on this SEC-Node!' % modulename)
def remove_module(self, modulename_or_obj): def remove_module(self, modulename_or_obj):
moduleobj = self.get_module(modulename_or_obj) moduleobj = self.get_module(modulename_or_obj)
@ -209,17 +210,17 @@ class Dispatcher:
def _execute_command(self, modulename, exportedname, argument=None): def _execute_command(self, modulename, exportedname, argument=None):
moduleobj = self.get_module(modulename) moduleobj = self.get_module(modulename)
if moduleobj is None: if moduleobj is None:
raise NoSuchModuleError('Module does not exist on this SEC-Node!') raise NoSuchModuleError('Module %r does not exist' % modulename)
cmdname = moduleobj.commands.exported.get(exportedname, None) cmdname = moduleobj.commands.exported.get(exportedname, None)
if cmdname is None: if cmdname is None:
raise NoSuchCommandError('Module has no command %r on this SEC-Node!' % exportedname) raise NoSuchCommandError('Module %r has no command %r' % (modulename, exportedname))
cmdspec = moduleobj.commands[cmdname] cmdspec = moduleobj.commands[cmdname]
if argument is None and cmdspec.datatype.argument is not None: if argument is None and cmdspec.datatype.argument is not None:
raise BadValueError('Command needs an argument!') raise BadValueError("Command '%s:%s' needs an argument" % (modulename, cmdname))
if argument is not None and cmdspec.datatype.argument is None: if argument is not None and cmdspec.datatype.argument is None:
raise BadValueError('Command takes no argument!') raise BadValueError("Command '%s:%s' takes no argument" % (modulename, cmdname))
if cmdspec.datatype.argument: if cmdspec.datatype.argument:
# validate! # validate!
@ -239,16 +240,18 @@ class Dispatcher:
def _setParameterValue(self, modulename, exportedname, value): def _setParameterValue(self, modulename, exportedname, value):
moduleobj = self.get_module(modulename) moduleobj = self.get_module(modulename)
if moduleobj is None: if moduleobj is None:
raise NoSuchModuleError('Module does not exist on this SEC-Node!') raise NoSuchModuleError('Module %r does not exist' % modulename)
pname = moduleobj.parameters.exported.get(exportedname, None) pname = moduleobj.parameters.exported.get(exportedname, None)
if pname is None: if pname is None:
raise NoSuchParameterError('Module has no parameter %r on this SEC-Node!' % exportedname) raise NoSuchParameterError('Module %r has no parameter %r' % (modulename, exportedname))
pobj = moduleobj.parameters[pname] pobj = moduleobj.parameters[pname]
if pobj.constant is not None: if pobj.constant is not None:
raise ReadOnlyError('This parameter is constant and can not be accessed remotely.') raise ReadOnlyError("Parameter %s:%s is constant and can not be changed remotely"
% (modulename, pname))
if pobj.readonly: if pobj.readonly:
raise ReadOnlyError('This parameter can not be changed remotely.') raise ReadOnlyError("Parameter %s:%s can not be changed remotely"
% (modulename, pname))
# validate! # validate!
value = pobj.datatype(value) value = pobj.datatype(value)
@ -264,11 +267,11 @@ class Dispatcher:
def _getParameterValue(self, modulename, exportedname): def _getParameterValue(self, modulename, exportedname):
moduleobj = self.get_module(modulename) moduleobj = self.get_module(modulename)
if moduleobj is None: if moduleobj is None:
raise NoSuchModuleError('Module does not exist on this SEC-Node!') raise NoSuchModuleError('Module %r does not exist' % modulename)
pname = moduleobj.parameters.exported.get(exportedname, None) pname = moduleobj.parameters.exported.get(exportedname, None)
if pname is None: if pname is None:
raise NoSuchParameterError('Module has no parameter %r on this SEC-Node!' % exportedname) raise NoSuchParameterError('Module %r has no parameter %r' % (modulename, exportedname))
pobj = moduleobj.parameters[pname] pobj = moduleobj.parameters[pname]
if pobj.constant is not None: if pobj.constant is not None:
# really needed? we could just construct a readreply instead.... # really needed? we could just construct a readreply instead....
@ -306,7 +309,7 @@ class Dispatcher:
if handler: if handler:
return handler(conn, specifier, data) return handler(conn, specifier, data)
raise InternalError('unhandled message!') raise SECoPServerError('unhandled message: %s' % repr(msg))
# now the (defined) handlers for the different requests # now the (defined) handlers for the different requests
def handle_help(self, conn, specifier, data): def handle_help(self, conn, specifier, data):
@ -328,7 +331,7 @@ class Dispatcher:
return (READREPLY, specifier, list(self._getParameterValue(modulename, pname))) return (READREPLY, specifier, list(self._getParameterValue(modulename, pname)))
def handle_change(self, conn, specifier, data): def handle_change(self, conn, specifier, data):
modulename, pname = specifier, 'value' modulename, pname = specifier, 'target'
if ':' in specifier: if ':' in specifier:
modulename, pname = specifier.split(':', 1) modulename, pname = specifier.split(':', 1)
return (WRITEREPLY, specifier, list(self._setParameterValue(modulename, pname, data))) return (WRITEREPLY, specifier, list(self._setParameterValue(modulename, pname, data)))
@ -352,13 +355,13 @@ class Dispatcher:
if ':' in specifier: if ':' in specifier:
modulename, exportedname = specifier.split(':', 1) modulename, exportedname = specifier.split(':', 1)
if modulename not in self._export: if modulename not in self._export:
raise NoSuchModuleError('Module does not exist on this SEC-Node!') raise NoSuchModuleError('Module %r does not exist' % modulename)
moduleobj = self.get_module(modulename) moduleobj = self.get_module(modulename)
if exportedname is not None: if exportedname is not None:
pname = moduleobj.accessiblename2attr.get(exportedname, True) pname = moduleobj.accessiblename2attr.get(exportedname, True)
if pname and pname not in moduleobj.accessibles: if pname and pname not in moduleobj.accessibles:
# what if we try to subscribe a command here ??? # what if we try to subscribe a command here ???
raise NoSuchParameterError('Module has no such parameter on this SEC-Node!') raise NoSuchParameterError('Module %r has no parameter %r' % (modulename, pname))
modules = [(modulename, pname)] modules = [(modulename, pname)]
else: else:
modules = [(modulename, None)] modules = [(modulename, None)]

View File

@ -221,6 +221,7 @@ class TCPServer(HasProperties, socketserver.ThreadingTCPServer):
self.setProperty('detailed_errors', detailed_errors) self.setProperty('detailed_errors', detailed_errors)
self.checkProperties() self.checkProperties()
self.allow_reuse_address = True
self.log.info("TCPServer %s binding to %s:%d" % (name, self.bindto, self.bindport)) self.log.info("TCPServer %s binding to %s:%d" % (name, self.bindto, self.bindport))
socketserver.ThreadingTCPServer.__init__( socketserver.ThreadingTCPServer.__init__(
self, (self.bindto, self.bindport), TCPRequestHandler, bind_and_activate=True) self, (self.bindto, self.bindport), TCPRequestHandler, bind_and_activate=True)

187
secop/protocol/router.py Normal file
View File

@ -0,0 +1,187 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""Secop Router
this is a replacement for the standard dispatcher, with the
additional functionality of routing message from/to several other SEC nodes
simplifications:
- module wise activation not supported
- on connection, the description from all nodes are cached and all nodes are activated
- on 'describe' and on 'activate', cached values are returned
- ping is not forwarded
- what to do on a change of descriptive data is not yet implemented
"""
import time
import secop.protocol.dispatcher
import secop.errors
from secop.protocol.messages import DESCRIPTIONREPLY, ENABLEEVENTSREPLY, ERRORPREFIX, EVENTREPLY,\
READREQUEST, WRITEREQUEST, COMMANDREQUEST
import secop.client
from secop.lib.multievent import MultiEvent
class SecopClient(secop.client.SecopClient):
DISCONNECTED = ('Communication failed', 'remote SEC node disconnected')
def __init__(self, uri, log, dispatcher):
self.dispatcher = dispatcher
super().__init__(uri, log)
def internalize_name(self, name):
"""do not modify names"""
return name
def updateEvent(self, module, parameter, value, timestamp, readerror):
specifier = '%s:%s' % (module, parameter)
if readerror:
msg = ERRORPREFIX + EVENTREPLY, specifier, (readerror.name, str(readerror), dict(t=timestamp))
else:
msg = EVENTREPLY, specifier, (value, dict(t=timestamp))
self.dispatcher.broadcast_event(msg)
def nodeStateChange(self, online, state):
t = time.time()
if not online:
for key, (value, _, readerror) in self.cache.items():
if not readerror:
self.cache[key] = value, t, self.DISCONNECTED
self.updateEvent(*key, *self.cache[key])
def descriptiveDataChange(self, module, data):
print('CHANGE', self.nodename)
self.dispatcher.restart()
#if module is None:
# self.log.error('descriptive data for node %r has changed', self.nodename)
class Router(secop.protocol.dispatcher.Dispatcher):
singlenode = None
def __init__(self, name, logger, options, srv):
"""initialize router
Use the option node = <uri> for a single node or
nodes = ["<uri1>", "<uri2>" ...] for multiple nodes.
If a single node is given, the node properties are forwarded transparently,
else the description property is a merge from all client node properties.
"""
uri = options.pop('node', None)
uris = options.pop('nodes', None)
if uri and uris:
raise secop.errors.ConfigError('can not specify node _and_ nodes')
super().__init__(name, logger, options, srv)
if uri:
self.nodes = [SecopClient(uri, logger.getChild('routed'), self)]
self.singlenode = self.nodes[0]
else:
self.nodes = [SecopClient(uri, logger.getChild('routed%d' % i), self) for i, uri in enumerate(uris)]
# register callbacks
for node in self.nodes:
node.register(None, node)
self.node_by_module = {}
multievent = MultiEvent()
for node in self.nodes:
node.spawn_connect(multievent.new().set)
multievent.wait(10) # wait for all nodes started
nodes = []
for node in self.nodes:
if node.online:
for module in node.modules:
self.node_by_module[module] = node
nodes.append(node)
else:
def check_new_node(online, state, self=self, node=node):
if online:
for module in node.modules:
self.node_by_module[module] = node
self.nodes.append(node)
self.restart()
return secop.client.UNREGISTER
return None
node.register(None, nodeStateChange=check_new_node)
logger.warning('can not connect to node %r', node.nodename)
def handle_describe(self, conn, specifier, data):
if self.singlenode:
return DESCRIPTIONREPLY, specifier, self.singlenode.descriptive_data
reply = super().handle_describe(conn, specifier, data)
result = reply[2]
allmodules = result.get('modules', {})
node_description = [result['description']]
for node in self.nodes:
data = node.descriptive_data.copy()
modules = data.pop('modules')
equipment_id = data.pop('equipment_id', 'unknown')
node_description.append('--- %s ---\n%s' % (equipment_id, data.pop('description', '')))
node_description.append('\n'.join('%s: %r' % kv for kv in data.items()))
for modname, moddesc in modules.items():
if modname in allmodules:
self.log.info('module %r is already present', modname)
else:
allmodules[modname] = moddesc
result['modules'] = allmodules
result['description'] = '\n\n'.join(node_description)
return DESCRIPTIONREPLY, specifier, result
def handle_activate(self, conn, specifier, data):
super().handle_activate(conn, specifier, data)
for node in self.nodes:
for (module, parameter), (value, t, readerror) in node.cache.items():
spec = '%s:%s' % (module, parameter)
if readerror:
reply = ERRORPREFIX + EVENTREPLY, spec, (readerror.name, str(readerror), dict(t=t))
else:
datatype = node.modules[module]['parameters'][parameter]['datatype']
reply = EVENTREPLY, spec, [datatype.export_value(value), dict(t=t)]
self.broadcast_event(reply)
return ENABLEEVENTSREPLY, None, None
def handle_deactivate(self, conn, specifier, data):
if specifier:
raise secop.errors.NotImplementedError('module wise activation not implemented')
super().handle_deactivate(conn, specifier, data)
def handle_read(self, conn, specifier, data):
module = specifier.split(':')[0]
if module in self._modules:
return super().handle_read(conn, specifier, data)
node = self.node_by_module[module]
if node.online:
return node.request(READREQUEST, specifier, data)
return ERRORPREFIX + READREQUEST, specifier, SecopClient.DISCONNECTED + (dict(t=node.disconnect_time),)
def handle_change(self, conn, specifier, data):
module = specifier.split(':')[0]
if module in self._modules:
return super().handle_change(conn, specifier, data)
return self.node_by_module[module].request(WRITEREQUEST, specifier, data)
def handle_do(self, conn, specifier, data):
module = specifier.split(':')[0]
if module in self._modules:
return super().handle_do(conn, specifier, data)
return self.node_by_module[module].request(COMMANDREQUEST, specifier, data)

View File

@ -40,7 +40,7 @@ except ImportError:
DaemonContext = None DaemonContext = None
from secop.errors import ConfigError from secop.errors import ConfigError
from secop.lib import formatException, get_class, getGeneralConfig from secop.lib import formatException, get_class, getGeneralConfig, mkthread
from secop.modules import Attached from secop.modules import Attached
@ -53,10 +53,12 @@ class Server:
# IMPORTANT: keep he order! (node MUST be first, as the others are referencing it!) # IMPORTANT: keep he order! (node MUST be first, as the others are referencing it!)
CFGSECTIONS = [ CFGSECTIONS = [
# section_prefix, default type, mapping of selectable classes # section_prefix, default type, mapping of selectable classes
('node', None, {None: "protocol.dispatcher.Dispatcher"}), ('node', 'std', {'std': "protocol.dispatcher.Dispatcher",
'router': 'protocol.router.Router'}),
('module', None, None), ('module', None, None),
('interface', "tcp", {"tcp": "protocol.interface.tcp.TCPServer"}), ('interface', "tcp", {"tcp": "protocol.interface.tcp.TCPServer"}),
] ]
_restart = True
def __init__(self, name, parent_logger=None): def __init__(self, name, parent_logger=None):
cfg = getGeneralConfig() cfg = getGeneralConfig()
@ -78,6 +80,7 @@ class Server:
self._dispatcher = None self._dispatcher = None
self._interface = None self._interface = None
self._restart_event = threading.Event()
def start(self): def start(self):
if not DaemonContext: if not DaemonContext:
@ -96,28 +99,29 @@ class Server:
self.run() self.run()
def run(self): def run(self):
try: while self._restart:
self._processCfg() self._restart = False
except Exception: try:
print(formatException(verbose=True)) self._processCfg()
raise except Exception:
print(formatException(verbose=True))
raise
self.log.info('startup done, handling transport messages') self.log.info('startup done, handling transport messages')
self._threads = set() threads = []
for ifname, ifobj in self.interfaces.items(): for ifname, ifobj in self.interfaces.items():
self.log.debug('starting thread for interface %r' % ifname) self.log.debug('starting thread for interface %r' % ifname)
t = threading.Thread(target=ifobj.serve_forever) threads.append((ifname, mkthread(ifobj.serve_forever)))
t.daemon = True for ifname, t in threads:
t.start() t.join()
self._threads.add(t) self.log.debug('thread for %r died' % ifname)
while self._threads:
time.sleep(1) def restart(self):
for t in self._threads: if not self._restart:
if not t.is_alive(): self._restart = True
self.log.debug('thread %r died (%d still running)' % for ifobj in self.interfaces.values():
(t, len(self._threads))) ifobj.shutdown()
t.join() ifobj.server_close()
self._threads.discard(t)
def _processCfg(self): def _processCfg(self):
self.log.debug('Parse config file %s ...' % self._cfgfile) self.log.debug('Parse config file %s ...' % self._cfgfile)
@ -129,7 +133,7 @@ class Server:
self.log.error('Couldn\'t read cfg file !') self.log.error('Couldn\'t read cfg file !')
raise ConfigError('Couldn\'t read cfg file %r' % self._cfgfile) raise ConfigError('Couldn\'t read cfg file %r' % self._cfgfile)
for kind, devtype, classmapping in self.CFGSECTIONS: for kind, default_type, classmapping in self.CFGSECTIONS:
kinds = '%ss' % kind kinds = '%ss' % kind
objs = OrderedDict() objs = OrderedDict()
self.__dict__[kinds] = objs self.__dict__[kinds] = objs
@ -145,7 +149,7 @@ class Server:
self.log.error('%s %s needs a class option!' % (kind.title(), name)) self.log.error('%s %s needs a class option!' % (kind.title(), name))
raise ConfigError('cfgfile %r: %s %s needs a class option!' % raise ConfigError('cfgfile %r: %s %s needs a class option!' %
(self._cfgfile, kind.title(), name)) (self._cfgfile, kind.title(), name))
type_ = opts.pop('type', devtype) type_ = opts.pop('type', default_type)
cls = classmapping.get(type_, None) cls = classmapping.get(type_, None)
if not cls: if not cls:
self.log.error('%s %s needs a type option (select one of %s)!' % self.log.error('%s %s needs a type option (select one of %s)!' %
@ -185,7 +189,9 @@ class Server:
for modname, modobj in self.modules.items(): for modname, modobj in self.modules.items():
self.log.info('registering module %r' % modname) self.log.info('registering module %r' % modname)
self.dispatcher.register_module(modobj, modname, modobj.properties['export']) self.dispatcher.register_module(modobj, modname, modobj.properties['export'])
modobj.pollerClass.add_to_table(poll_table, modobj) if modobj.pollerClass is not None:
# a module might be explicitly excluded from polling by setting pollerClass to None
modobj.pollerClass.add_to_table(poll_table, modobj)
# also call earlyInit on the modules # also call earlyInit on the modules
modobj.earlyInit() modobj.earlyInit()