Files
frappy/frappy/protocol/router.py
Markus Zolliker fd43687465 equipment_id for merged configs and routed nodes
Add a new custom module property 'original_id' indicating
the equipment_id the modules originally belongs to.
This property is only given, when distinct from the equipment_id
of the SEC node.
It happens when multiple config files are given, for all modules
but the ones given in the first file, and for routed modules,
when  multiple nodes are routed or own modules are given.

+ fix an issue in router: additional modules were ignore in case
of a single node.

+ small cosmetic changes in config.py reducing IDE complains

Change-Id: If846c47a06158629cef807d22b91f69e4f416563
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/35396
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
2025-01-17 15:00:20 +01:00

194 lines
8.3 KiB
Python

# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""Secop Router
this is a replacement for the standard dispatcher, with the
additional functionality of routing message from/to several other SEC nodes
simplifications:
- module wise activation not supported
- on connection, the description from all nodes are cached and all nodes are activated
- on 'describe' and on 'activate', cached values are returned
- ping is not forwarded
- what to do on a change of descriptive data is not yet implemented
"""
import time
import frappy.client
import frappy.errors
import frappy.protocol.dispatcher
from frappy.lib.multievent import MultiEvent
from frappy.protocol.messages import COMMANDREQUEST, DESCRIPTIONREPLY, \
ENABLEEVENTSREPLY, ERRORPREFIX, EVENTREPLY, READREQUEST, WRITEREQUEST
class SecopClient(frappy.client.SecopClient):
disconnectedExc = frappy.errors.CommunicationFailedError('remote SEC node disconnected')
disconnectedError = (disconnectedExc.name, str(disconnectedExc))
def __init__(self, uri, log, dispatcher):
self.dispatcher = dispatcher
super().__init__(uri, log)
def internalize_name(self, name):
"""do not modify names"""
return name
def updateEvent(self, module, parameter, value, timestamp, readerror):
specifier = f'{module}:{parameter}'
if readerror:
msg = ERRORPREFIX + EVENTREPLY, specifier, (readerror.name, str(readerror), {'t': timestamp})
else:
msg = EVENTREPLY, specifier, (value, {'t': timestamp})
self.dispatcher.broadcast_event(msg)
def nodeStateChange(self, online, state):
t = time.time()
if not online:
for key, (value, _, readerror) in self.cache.items():
if not readerror:
self.cache[key] = value, t, self.disconnectedExc
self.updateEvent(*key, *self.cache[key])
def descriptiveDataChange(self, module, data):
if module is None:
self.dispatcher.restart()
self._shutdown = True
raise frappy.errors.SECoPError(f'descriptive data for node {self.nodename!r} has changed')
class Router(frappy.protocol.dispatcher.Dispatcher):
def __init__(self, name, logger, options, srv):
"""initialize router
Use the option node = <uri> for a single node or
nodes = ["<uri1>", "<uri2>" ...] for multiple nodes.
If a single node is given, and no more additional modules are given,
the node properties are forwarded transparently,
else the description property is a merge from all client node properties.
"""
uri = options.pop('node', None)
uris = options.pop('nodes', None)
try:
if uris is not None:
if isinstance(uris, str) or not all(isinstance(v, str) for v in uris) or uri:
raise TypeError()
elif isinstance(uri, str):
uris = [uri]
else:
raise TypeError()
except Exception as e:
raise frappy.errors.ConfigError("a router needs either 'node' as a string'"
"' or 'nodes' as a list of strings") from e
super().__init__(name, logger, options, srv)
self.nodes = [SecopClient(uri, logger.getChild(f'routed{i}'), self) for i, uri in enumerate(uris)]
# register callbacks
for node in self.nodes:
node.register_callback(None, node.updateEvent, node.descriptiveDataChange, node.nodeStateChange)
self.restart = srv.restart
self.node_by_module = {}
multievent = MultiEvent()
for node in self.nodes:
node.spawn_connect(multievent.new().set)
multievent.wait(10) # wait for all nodes started
nodes = []
for node in self.nodes:
if node.online:
for module in node.modules:
self.node_by_module[module] = node
nodes.append(node)
else:
def nodeStateChange(online, state, self=self, node=node):
if online:
for module in node.modules:
self.node_by_module[module] = node
self.nodes.append(node)
self.restart()
raise frappy.client.UnregisterCallback()
node.register_callback(None, nodeStateChange)
logger.warning('can not connect to node %r', node.nodename)
def handle_describe(self, conn, specifier, data):
if len(self.nodes) == 1 and not self.secnode.modules:
return DESCRIPTIONREPLY, specifier, self.nodes[0].descriptive_data
reply = super().handle_describe(conn, specifier, data)
result = reply[2]
allmodules = result.get('modules', {})
node_description = [result['description']]
for node in self.nodes:
data = node.descriptive_data.copy()
modules = data.pop('modules')
equipment_id = data.pop('equipment_id', 'unknown')
node_description.append(f"--- {equipment_id} ---\n{data.pop('description', '')}")
node_description.append('\n'.join('%s: %r' % kv for kv in data.items()))
for modname, moddesc in modules.items():
if modname in allmodules:
self.log.info('module %r is already present', modname)
else:
allmodules[modname] = moddesc
moddesc.setdefault('original_id', equipment_id)
result['modules'] = allmodules
result['description'] = '\n\n'.join(node_description)
return DESCRIPTIONREPLY, specifier, result
def handle_activate(self, conn, specifier, data):
super().handle_activate(conn, specifier, data)
for node in self.nodes:
for (module, parameter), (value, t, readerror) in node.cache.items():
spec = f'{module}:{parameter}'
if readerror:
reply = ERRORPREFIX + EVENTREPLY, spec, (readerror.name, str(readerror), {'t': t})
else:
datatype = node.modules[module]['parameters'][parameter]['datatype']
reply = EVENTREPLY, spec, [datatype.export_value(value), {'t': t}]
self.broadcast_event(reply)
return ENABLEEVENTSREPLY, None, None
def handle_deactivate(self, conn, specifier, data):
if specifier:
raise frappy.errors.NotImplementedError('module wise activation not implemented')
super().handle_deactivate(conn, specifier, data)
def handle_read(self, conn, specifier, data):
module = specifier.split(':')[0]
if module in self._modules:
return super().handle_read(conn, specifier, data)
node = self.node_by_module[module]
if node.online:
return node.request(READREQUEST, specifier, data)
return ERRORPREFIX + READREQUEST, specifier, SecopClient.disconnectedError + ({'t': node.disconnect_time},)
def handle_change(self, conn, specifier, data):
module = specifier.split(':')[0]
if module in self._modules:
return super().handle_change(conn, specifier, data)
return self.node_by_module[module].request(WRITEREQUEST, specifier, data)
def handle_do(self, conn, specifier, data):
module = specifier.split(':')[0]
if module in self._modules:
return super().handle_do(conn, specifier, data)
return self.node_by_module[module].request(COMMANDREQUEST, specifier, data)