frappy/secop/protocol/dispatcher.py
Enrico Faulhaber 8efd1df92c fix isort
Change-Id: I5486e1f9b3143c6bb0804c49c7ca21adb9b84de1
Reviewed-on: https://forge.frm2.tum.de/review/19524
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Tested-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
2018-12-04 10:18:55 +01:00

371 lines
15 KiB
Python

# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
#
# *****************************************************************************
"""Dispatcher for SECoP Messages
Interface to the service offering part:
- 'handle_request(connectionobj, data)' handles incoming request
will call 'queue_request(data)' on connectionobj before returning
- 'add_connection(connectionobj)' registers new connection
- 'remove_connection(connectionobj)' removes now longer functional connection
- may at any time call 'queue_async_request(connobj, data)' on the connobj
Interface to the modules:
- add_module(modulename, moduleobj, export=True) registers a new module under the
given name, may also register it for exporting (making accessible)
- get_module(modulename) returns the requested module or None
- remove_module(modulename_or_obj): removes the module (during shutdown)
"""
from __future__ import division, print_function
import threading
from time import time as currenttime
from secop.errors import SECoPServerError as InternalError
from secop.errors import (BadValueError, NoSuchCommandError, NoSuchModuleError,
NoSuchParameterError, ProtocolError, ReadOnlyError)
from secop.params import Parameter
from secop.protocol.messages import (COMMANDREPLY, DESCRIPTIONREPLY,
DISABLEEVENTSREPLY, ENABLEEVENTSREPLY,
EVENTREPLY, HEARTBEATREPLY, IDENTREPLY,
IDENTREQUEST, WRITEREPLY)
try:
unicode('a')
except NameError:
# no unicode on py3
unicode = str # pylint: disable=redefined-builtin
class Dispatcher(object):
def __init__(self, name, logger, options, srv):
# to avoid errors, we want to eat all options here
self.equipment_id = name
self.nodeprops = {}
for k in list(options):
self.nodeprops[k] = options.pop(k)
self.log = logger
# map ALL modulename -> moduleobj
self._modules = {}
# list of EXPORTED modules
self._export = []
# list all connections
self._connections = []
# active (i.e. broadcast-receiving) connections
self._active_connections = set()
# map eventname -> list of subscribed connections
# eventname is <modulename> or <modulename>:<parametername>
self._subscriptions = {}
self._lock = threading.RLock()
def broadcast_event(self, msg, reallyall=False):
"""broadcasts a msg to all active connections
used from the dispatcher"""
if reallyall:
listeners = self._connections
else:
# all subscribers to module:param
listeners = self._subscriptions.get(msg[1], set()).copy()
# all subscribers to module
module = msg[1].split(':', 1)[0]
listeners.update(self._subscriptions.get(module, set()))
# all generic subscribers
listeners.update(self._active_connections)
for conn in listeners:
conn.queue_async_reply(msg)
def announce_update(self, moduleobj, pname, pobj):
"""called by modules param setters to notify subscribers of new values
"""
msg = (EVENTREPLY, u'%s:%s' % (moduleobj.name, pname), [pobj.export_value(), dict(t=pobj.timestamp)])
self.broadcast_event(msg)
def subscribe(self, conn, modulename, pname=None):
eventname = modulename
if pname:
eventname = u'%s:%s' % (modulename, pname)
self._subscriptions.setdefault(eventname, set()).add(conn)
def unsubscribe(self, conn, modulename, pname=None):
if pname:
eventname = u'%s:%s' % (modulename, pname)
else:
eventname = modulename
# also remove 'more specific' subscriptions
for k, v in self._subscriptions.items():
if k.startswith(u'%s:' % modulename):
v.discard(conn)
if eventname in self._subscriptions:
self._subscriptions[eventname].discard(conn)
def add_connection(self, conn):
"""registers new connection"""
self._connections.append(conn)
def remove_connection(self, conn):
"""removes now longer functional connection"""
if conn in self._connections:
self._connections.remove(conn)
for _evt, conns in list(self._subscriptions.items()):
conns.discard(conn)
self._active_connections.discard(conn)
def register_module(self, moduleobj, modulename, export=True):
self.log.debug(u'registering module %r as %s (export=%r)' %
(moduleobj, modulename, export))
self._modules[modulename] = moduleobj
if export:
self._export.append(modulename)
def get_module(self, modulename):
if modulename in self._modules:
return self._modules[modulename]
elif modulename in list(self._modules.values()):
return modulename
raise NoSuchModuleError('Module does not exist on this SEC-Node!')
def remove_module(self, modulename_or_obj):
moduleobj = self.get_module(modulename_or_obj)
modulename = moduleobj.name
if modulename in self._export:
self._export.remove(modulename)
self._modules.pop(modulename)
self._subscriptions.pop(modulename, None)
for k in [k for k in self._subscriptions if k.startswith(u'%s:' % modulename)]:
self._subscriptions.pop(k, None)
def list_module_names(self):
# return a copy of our list
return self._export[:]
def export_accessibles(self, modulename):
self.log.debug(u'export_accessibles(%r)' % modulename)
if modulename in self._export:
# omit export=False params!
res = []
for aname, aobj in self.get_module(modulename).accessibles.items():
if aobj.export:
res.append([aname, aobj.for_export()])
self.log.debug(u'list accessibles for module %s -> %r' %
(modulename, res))
return res
self.log.debug(u'-> module is not to be exported!')
return {}
def get_descriptive_data(self):
"""returns a python object which upon serialisation results in the descriptive data"""
# XXX: be lazy and cache this?
# format: {[{[{[, specific entries first
result = {u'modules': []}
for modulename in self._export:
module = self.get_module(modulename)
if not module.properties.get('export', False):
continue
# some of these need rework !
mod_desc = {u'accessibles': self.export_accessibles(modulename)}
for propname, prop in list(module.properties.items()):
if propname == 'export':
continue
mod_desc[propname] = prop
result[u'modules'].append([modulename, mod_desc])
result[u'equipment_id'] = self.equipment_id
result[u'firmware'] = u'FRAPPY - The Python Framework for SECoP'
result[u'version'] = u'2018.09'
result.update(self.nodeprops)
return result
def _execute_command(self, modulename, command, argument=None):
moduleobj = self.get_module(modulename)
if moduleobj is None:
raise NoSuchModuleError('Module does not exist on this SEC-Node!')
cmdspec = moduleobj.accessibles.get(command, None)
if cmdspec is None:
raise NoSuchCommandError('Module has no such command!')
if argument is None and cmdspec.datatype.argtype is not None:
raise BadValueError(u'Command needs an argument!')
if argument is not None and cmdspec.datatype.argtype is None:
raise BadValueError(u'Command takes no argument!')
# now call func and wrap result as value
# note: exceptions are handled in handle_request, not here!
func = getattr(moduleobj, u'do_' + command)
res = func(argument) if argument else func()
# XXX: pipe through cmdspec.datatype.result ?
return res, dict(t=currenttime())
def _setParameterValue(self, modulename, pname, value):
moduleobj = self.get_module(modulename)
if moduleobj is None:
raise NoSuchModuleError('Module does not exist on this SEC-Node!')
pobj = moduleobj.accessibles.get(pname, None)
if pobj is None or not isinstance(pobj, Parameter):
raise NoSuchParameterError('Module has no such parameter on this SEC-Node!')
if pobj.readonly:
raise ReadOnlyError('This parameter can not be changed remotely.')
writefunc = getattr(moduleobj, u'write_%s' % pname, None)
# note: exceptions are handled in handle_request, not here!
if writefunc:
value = writefunc(value)
else:
setattr(moduleobj, pname, value)
if pobj.timestamp:
return pobj.export_value(), dict(t=pobj.timestamp)
return pobj.export_value(), {}
def _getParameterValue(self, modulename, pname):
moduleobj = self.get_module(modulename)
if moduleobj is None:
raise NoSuchModuleError('Module does not exist on this SEC-Node!')
pobj = moduleobj.accessibles.get(pname, None)
if pobj is None or not isinstance(pobj, Parameter):
raise NoSuchParameterError('Module has no such parameter on this SEC-Node!')
readfunc = getattr(moduleobj, u'read_%s' % pname, None)
if readfunc:
# should also update the pobj (via the setter from the metaclass)
# note: exceptions are handled in handle_request, not here!
readfunc()
if pobj.timestamp:
return pobj.export_value(), dict(t=pobj.timestamp)
return pobj.export_value(), {}
#
# api to be called from the 'interface'
# any method above has no idea about 'messages', this is handled here
#
def handle_request(self, conn, msg):
"""handles incoming request
will call 'queue_async_request(data)' on conn or return reply
"""
self.log.debug(u'Dispatcher: handling msg: %s' % repr(msg))
# play thread safe !
# XXX: ONLY ONE REQUEST (per dispatcher) AT A TIME
with self._lock:
action, specifier, data = msg
# special case for *IDN?
if action == IDENTREQUEST:
action, specifier, data = '_ident', None, None
self.log.debug(u'Looking for handle_%s' % action)
handler = getattr(self, u'handle_%s' % action, None)
if handler:
return handler(conn, specifier, data)
else:
raise InternalError('unhandled message!')
# now the (defined) handlers for the different requests
def handle_help(self, conn, specifier, data):
self.log.error('should have been handled in the interface!')
def handle__ident(self, conn, specifier, data):
return (IDENTREPLY, None, None)
def handle_describe(self, conn, specifier, data):
return (DESCRIPTIONREPLY, '.', self.get_descriptive_data())
def handle_read(self, conn, specifier, data):
if data:
raise ProtocolError('poll request don\'t take data!')
modulename, pname = specifier, u'value'
if ':' in specifier:
modulename, pname = specifier.split(':', 1)
# XXX: trigger polling and force sending event ???
return (EVENTREPLY, specifier, list(self._getParameterValue(modulename, pname)))
def handle_change(self, conn, specifier, data):
modulename, pname = specifier, u'value'
if ':' in specifier:
modulename, pname = specifier.split(u':', 1)
return (WRITEREPLY, specifier, list(self._setParameterValue(modulename, pname, data)))
def handle_do(self, conn, specifier, data):
# XXX: should this be done asyncron? we could just return the reply in
# that case
modulename, cmd = specifier.split(u':', 1)
return (COMMANDREPLY, specifier, list(self._execute_command(modulename, cmd, data)))
def handle_ping(self, conn, specifier, data):
if data:
raise ProtocolError('poll request don\'t take data!')
return (HEARTBEATREPLY, specifier, [None, {u't':currenttime()}])
def handle_activate(self, conn, specifier, data):
if data:
raise ProtocolError('activate request don\'t take data!')
if specifier:
modulename, pname = specifier, None
if ':' in specifier:
modulename, pname = specifier.split(u':', 1)
if modulename not in self._export:
raise NoSuchModuleError('Module does not exist on this SEC-Node!')
if pname and pname not in self.get_module(modulename).accessibles:
# what if we try to subscribe a command here ???
raise NoSuchParameterError('Module has no such parameter on this SEC-Node!')
# activate only ONE module
self.subscribe(conn, modulename, pname)
modules = [(modulename, pname)]
else:
# activate all modules
self._active_connections.add(conn)
modules = [(m, None) for m in self._export]
# send updates for all subscribed values.
# note: The initial poll already happend before the server is active
for modulename, pname in modules:
moduleobj = self._modules.get(modulename, None)
if pname:
pobj = moduleobj.accessibles[pname]
updmsg = (EVENTREPLY, u'%s:%s' % (modulename, pname),
[pobj.export_value(), dict(t=pobj.timestamp)])
conn.queue_async_reply(updmsg)
continue
for pname, pobj in moduleobj.accessibles.items(): # pylint: disable=redefined-outer-name
if not isinstance(pobj, Parameter):
continue
if not pobj.export: # XXX: handle export_as cases!
continue
# can not use announce_update here, as this will send to all clients
updmsg = (EVENTREPLY, u'%s:%s' % (modulename, pname),
[pobj.export_value(), dict(t=pobj.timestamp)])
conn.queue_async_reply(updmsg)
return (ENABLEEVENTSREPLY, specifier, None) if specifier else (ENABLEEVENTSREPLY, None, None)
def handle_deactivate(self, conn, specifier, data):
if specifier:
self.unsubscribe(conn, specifier)
else:
self._active_connections.discard(conn)
# XXX: also check all entries in self._subscriptions?
return (DISABLEEVENTSREPLY, None, None)