implement SECoP proxy modules

A proxy module is a module with a known structure, but
accessed over a SECoP connection.
For the configuration, a Frappy module class has to be given.
The proxy class is created from this, but does not inherit from it.
However, the class of the returned object will be subclass of the
SECoP base classes (Readable, Drivable etc.).
A possible extension might be, that instead of the Frappy class,
the JSON module description can be given, as a separate file
or directly in the config file.
Or we might offer a tool to convert the JSON description to
a python class.

Change-Id: I9212d9f3fe82ec56dfc08611d0e1efc0b0112271
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/22386
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
2020-01-30 10:24:40 +01:00
parent 9825b9c135
commit 97034fb998
12 changed files with 550 additions and 114 deletions

View File

@ -64,8 +64,8 @@ class Logger:
error = warning = critical = info
class CallbackMixin:
"""abstract mixin
class CallbackObject:
"""abstract definition for a target object for callbacks
this is mainly for documentation, but it might be extended
and used as a mixin for objects registered as a callback
@ -94,33 +94,115 @@ class CallbackMixin:
"""
class SecopClient:
class ProxyClient:
"""common functionality for proxy clients"""
CALLBACK_NAMES = ('updateEvent', 'descriptiveDataChange', 'nodeStateChange', 'unhandledMessage')
online = False # connected or reconnecting since a short time
validate_data = False
_state = 'disconnected' # further possible values: 'connecting', 'reconnecting', 'connected'
def __init__(self):
self.callbacks = {cbname: defaultdict(list) for cbname in self.CALLBACK_NAMES}
# caches (module, parameter) = value, timestamp, readerror (internal names!)
self.cache = {}
def register(self, key, obj=None, **kwds):
"""register callback functions
- kwds each key must be a valid callback name defined in self.CALLBACK_NAMES
- kwds values are the callback functions
- if obj is not None, use its methods named from the callback name, if not given in kwds
- key might be either:
1) None: general callback (all callbacks)
2) <module name>: callbacks related to a module (not called for 'unhandledMessage')
3) (<module name>, <parameter name>): callback for specified parameter (only called for 'updateEvent')
"""
for cbname in self.CALLBACK_NAMES:
cbfunc = kwds.pop(cbname, None)
if obj and cbfunc is None:
cbfunc = getattr(obj, cbname, None)
if not cbfunc:
continue
cbdict = self.callbacks[cbname]
cbdict[key].append(cbfunc)
# immediately call for some callback types
if cbname == 'updateEvent':
if key is None:
for (mname, pname), data in self.cache.items():
cbfunc(mname, pname, *data)
else:
data = self.cache.get(key, None)
if data:
cbfunc(*key, *data) # case single parameter
else: # case key = module
for (mname, pname), data in self.cache.items():
if mname == key:
cbfunc(mname, pname, *data)
elif cbname == 'nodeStateChange':
cbfunc(self.online, self._state)
if kwds:
raise TypeError('unknown callback: %s' % (', '.join(kwds)))
def callback(self, key, cbname, *args):
"""perform callbacks
key=None:
key=<module name>: callbacks for specified module
key=(<module name>, <parameter name): callbacks for specified parameter
"""
cblist = self.callbacks[cbname].get(key, [])
self.callbacks[cbname][key] = [cb for cb in cblist if cb(*args) is not UNREGISTER]
return bool(cblist)
def updateValue(self, module, param, value, timestamp, readerror):
if readerror:
assert isinstance(readerror, Exception)
if self.validate_data:
try:
# try to validate, reason: make enum_members from integers
datatype = self.modules[module]['parameters'][param]['datatype']
value = datatype(value)
except (KeyError, ValueError):
pass
self.cache[(module, param)] = (value, timestamp, readerror)
self.callback(None, 'updateEvent', module, param, value, timestamp, readerror)
self.callback(module, 'updateEvent', module, param, value, timestamp, readerror)
self.callback((module, param), 'updateEvent', module, param, value, timestamp, readerror)
def getParameter(self, module, parameter, trycache=False):
if trycache:
cached = self.cache.get((module, parameter), None)
if cached:
return cached
if self.online:
self.readParameter(module, parameter)
return self.cache[module, parameter]
def readParameter(self, module, parameter):
"""forced read over connection"""
raise NotImplementedError()
class SecopClient(ProxyClient):
"""a general SECoP client"""
reconnect_timeout = 10
shutdown = False
_rxthread = None
_txthread = None
_state = 'disconnected' # further possible values: 'connecting', 'reconnecting', 'connected'
online = False # connected or reconnecting since a short time
disconnect_time = 0 # time of last disconnect
secop_version = ''
_rxbuffer = b''
descriptive_data = {}
CALLBACK_NAMES = 'updateEvent', 'nodeStateChange', 'unhandledMessage', 'descriptiveDataChange', 'handleMessage'
callbacks = {}
modules = {}
_last_error = None
validate_data = False
def __init__(self, uri, log=Logger):
"""like __init__, but called from SecopClient.__new__"""
super().__init__()
# maps expected replies to [request, Event, is_error, result] until a response came
# there can only be one entry per thread calling 'request'
self.active_requests = {}
# caches (module, parameter) = value, timestamp, readerror (internal names!)
self.cache = {}
self.io = None
self.callbacks = {cbname: defaultdict(list) for cbname in self.CALLBACK_NAMES}
self.txq = queue.Queue(30) # queue for tx requests
self.pending = queue.Queue(30) # requests with colliding action + ident
self.log = log
@ -223,14 +305,14 @@ class SecopClient:
if module_param is not None:
if action.startswith(ERRORPREFIX):
timestamp = data[2].get('t', None)
readerror = tuple(data[0:2])
readerror = secop.errors.make_secop_error(*data[0:2])
value = None
else:
timestamp = data[1].get('t', None)
value = data[0]
readerror = None
module, param = module_param
self._update_value(module, param, value, timestamp, readerror)
self.updateValue(module, param, value, timestamp, readerror)
if action in (EVENTREPLY, ERRORPREFIX + EVENTREPLY):
continue
try:
@ -359,84 +441,25 @@ class SecopClient:
self.modules[modname] = dict(accessibles=accessibles, parameters=parameters,
commands=commands, properties=properties)
if changed_modules is not None:
done = self.node_callback('descriptiveDataChange', None, self)
done = self.callback(None, 'descriptiveDataChange', None, self)
for mname in changed_modules:
if not self.module_callback('descriptiveDataChange', mname, mname, self):
if not self.callback(mname, 'descriptiveDataChange', mname, self):
self.log.warning('descriptive data changed on module %r', mname)
done = True
if not done:
self.log.warning('descriptive data of %r changed', self.nodename)
def register(self, obj=None, module=None, **kwds):
"""register callback functions
- kwds keys must be valid callback name defined in self.CALLBACK_NAMES
- kwds names are the callback functions
- if obj is not None, use its methods named from the callback name, if not given in kwds
- module may be a module name. if not None and not omitted, the registered callback will
be called only when it is related to the given module
"""
for cbname in self.CALLBACK_NAMES:
cbfunc = kwds.pop(cbname, None)
if obj and cbfunc is None:
cbfunc = getattr(obj, cbname, None)
if not cbfunc:
continue
cbdict = self.callbacks[cbname]
cbdict[module].append(cbfunc)
if cbname == 'updateEvent':
if module is None:
for (mname, pname), data in self.cache.items():
cbfunc(mname, pname, *data)
else:
for (mname, pname), data in self.cache.items():
if mname == module:
cbfunc(mname, pname, *data)
elif cbname == 'nodeStateChange':
cbfunc(self.online, self._state)
if kwds:
raise TypeError('unknown callback: %s' % (', '.join(kwds)))
def node_callback(self, cbname, *args):
cblist = self.callbacks[cbname].get(None, [])
self.callbacks[cbname][None] = [cb for cb in cblist if cb(*args) is not UNREGISTER]
return bool(cblist)
def module_callback(self, cbname, mname, *args):
cblist = self.callbacks[cbname].get(mname, [])
self.callbacks[cbname][mname] = [cb for cb in cblist if cb(*args) is not UNREGISTER]
return bool(cblist)
def _update_value(self, module, param, value, timestamp, readerror):
if readerror:
assert isinstance(readerror, tuple)
if self.validate_data:
try:
# try to validate, reason: make enum_members from integers
datatype = self.modules[module]['parameters'][param]['datatype']
value = datatype(value)
except (KeyError, ValueError):
pass
self.cache[(module, param)] = (value, timestamp, readerror)
self.node_callback('updateEvent', module, param, value, timestamp, readerror)
self.module_callback('updateEvent', module, module, param, value, timestamp, readerror)
def _unhandled_message(self, action, ident, data):
mname = None
if ident:
mname = ident.split(':')[0]
done = self.node_callback('unhandledMessage', action, ident, data)
done = self.module_callback('unhandledMessage', mname, action, ident, data) or done
if not done:
if not self.callback(None, 'unhandledMessage', action, ident, data):
self.log.warning('unhandled message: %s %s %r' % (action, ident, data))
def _set_state(self, online, state=None):
# treat reconnecting as online!
self._state = state or self._state
self.online = online
self.node_callback('nodeStateChange', self.online, self._state)
self.callback(None, 'nodeStateChange', self.online, self._state)
for mname in self.modules:
self.module_callback('nodeStateChange', mname, self.online, self._state)
self.callback(mname, 'nodeStateChange', self.online, self._state)
def queue_request(self, action, ident=None, data=None):
"""make a request"""
@ -449,7 +472,7 @@ class SecopClient:
def get_reply(self, entry):
"""wait for reply and return it"""
if not entry[1].wait(10): # entry
if not entry[1].wait(10): # event
raise TimeoutError('no response within 10s')
if not entry[2]: # reply
raise ConnectionError('connection closed before reply')
@ -467,18 +490,13 @@ class SecopClient:
entry = self.queue_request(action, ident, data)
return self.get_reply(entry)
def getParameter(self, module, parameter, trycache=False):
if trycache:
cached = self.cache.get((module, parameter), None)
if cached:
return cached
if self.online:
try:
self.request(READREQUEST, self.identifier[module, parameter])
except secop.errors.SECoPError:
# error reply message is already stored as readerror in cache
pass
return self.cache[module, parameter]
def readParameter(self, module, parameter):
try:
self.request(READREQUEST, self.identifier[module, parameter])
except secop.errors.SECoPError:
# error reply message is already stored as readerror in cache
pass
return self.cache.get((module, parameter), None)
def setParameter(self, module, parameter, value):
self.connect() # make sure we are connected