# -*- coding: utf-8 -*- # ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Enrico Faulhaber # Markus Zolliker # # ***************************************************************************** """general SECoP client""" import time import queue import json from threading import Event, RLock from collections import defaultdict from secop.lib import mkthread, formatExtendedTraceback, formatExtendedStack from secop.lib.asynconn import AsynConn, ConnectionClosed from secop.datatypes import get_datatype from secop.protocol.interface import encode_msg_frame, decode_msg from secop.protocol.messages import REQUEST2REPLY, ERRORPREFIX, EVENTREPLY, WRITEREQUEST, WRITEREPLY, \ READREQUEST, READREPLY, IDENTREQUEST, IDENTPREFIX, ENABLEEVENTSREQUEST, COMMANDREQUEST, DESCRIPTIONREQUEST import secop.errors import secop.params # replies to be handled for cache UPDATE_MESSAGES = {EVENTREPLY, READREPLY, WRITEREPLY, ERRORPREFIX + READREQUEST, ERRORPREFIX + EVENTREPLY} class UNREGISTER: """a magic value, used a returned value in a callback to indicate it has to be unregistered used to implement one shot callbacks """ class Logger: """dummy logger, in case not provided from caller""" @staticmethod def info(fmt, *args, **kwds): print(str(fmt) % args) @staticmethod def noop(fmt, *args, **kwds): pass debug = noop error = warning = critical = info class CallbackMixin: """abstract mixin this is mainly for documentation, but it might be extended and used as a mixin for objects registered as a callback """ def updateEvent(self, module, parameter, value, timestamp, readerror): """called whenever a value is changed or when new callbacks are registered """ def unhandledMessage(self, action, ident, data): """called on an unhandled message""" def nodeStateChange(self, online, state): """called when the state of the connection changes 'online' is True when connected or reconnecting, False when disconnected or connecting 'state' is the connection state as a string """ def descriptiveDataChange(self, module, description): """called when the description has changed this callback is called on the node with module=None and on every changed module with module== """ class SecopClient: """a general SECoP client""" reconnect_timeout = 10 shutdown = False _rxthread = None _txthread = None _state = 'disconnected' # further possible values: 'connecting', 'reconnecting', 'connected' online = False # connected or reconnecting since a short time disconnect_time = 0 # time of last disconnect secop_version = '' _rxbuffer = b'' descriptive_data = {} CALLBACK_NAMES = 'updateEvent', 'nodeStateChange', 'unhandledMessage', 'descriptiveDataChange', 'handleMessage' callbacks = {} modules = {} _last_error = None validate_data = False def __init__(self, uri, log=Logger): """like __init__, but called from SecopClient.__new__""" # maps expected replies to [request, Event, is_error, result] until a response came # there can only be one entry per thread calling 'request' self.active_requests = {} # caches (module, parameter) = value, timestamp, readerror (internal names!) self.cache = {} self.io = None self.callbacks = {cbname: defaultdict(list) for cbname in self.CALLBACK_NAMES} self.txq = queue.Queue(30) # queue for tx requests self.pending = queue.Queue(30) # requests with colliding action + ident self.log = log self.uri = uri self.nodename = uri self._lock = RLock() def __del__(self): try: self.disconnect() except Exception: pass def connect(self, try_period=0): """establish connection if a is given, repeat trying for the given time (sec) """ with self._lock: if self.io: return if self.online: self._set_state(True, 'reconnecting') else: self._set_state(False, 'connecting') deadline = time.time() + try_period while True: try: self.io = AsynConn(self.uri) # timeout 1 sec self.io.writeline(IDENTREQUEST.encode('utf-8')) reply = self.io.readline(10) if reply: self.secop_version = reply.decode('utf-8') else: raise self.error_map('HardwareError')('no answer to %s' % IDENTREQUEST) if not self.secop_version.startswith(IDENTPREFIX): raise self.error_map('HardwareError')('bad answer to %s: %r' % (IDENTREQUEST, self.secop_version)) # now its safe to do secop stuff self._rxthread = mkthread(self.__rxthread) self._txthread = mkthread(self.__txthread) self.log.debug('connected to %s', self.uri) # pylint: disable=unsubscriptable-object self._init_descriptive_data(self.request(DESCRIPTIONREQUEST)[2]) self.nodename = self.properties.get('equipment_id', self.uri) if self.activate: self.request(ENABLEEVENTSREQUEST) self._set_state(True, 'connected') break except Exception: # print(formatExtendedTraceback()) if time.time() > deadline: # stay online for now, if activated self._set_state(self.online and self.activate, 'disconnected') raise time.sleep(1) self.log.info('%s ready', self.nodename) def __txthread(self): while not self.shutdown: entry = self.txq.get() if entry is None: break request = entry[0] reply_action = REQUEST2REPLY.get(request[0], None) if reply_action: key = (reply_action, request[1]) # action and identifier else: # allow experimental unknown requests, but only one at a time key = None if key in self.active_requests: # store to requeue after the next reply was received self.pending.put(entry) else: self.active_requests[key] = entry line = encode_msg_frame(*request) self.log.debug('TX: %r', line) self.io.send(line) self._txthread = None self.disconnect() def __rxthread(self): while not self.shutdown: try: reply = self.io.readline() if reply is None: continue except ConnectionClosed: break action, ident, data = decode_msg(reply) if ident == '.': ident = None if action in UPDATE_MESSAGES: module_param = self.internal.get(ident, None) if module_param is None and ':' not in ident: # allow missing ':value'/':target' if action == WRITEREPLY: module_param = self.internal.get(ident + ':target', None) else: module_param = self.internal.get(ident + ':value', None) if module_param is not None: if action.startswith(ERRORPREFIX): timestamp = data[2].get('t', None) readerror = tuple(data[0:2]) value = None else: timestamp = data[1].get('t', None) value = data[0] readerror = None module, param = module_param self._update_value(module, param, value, timestamp, readerror) if action in (EVENTREPLY, ERRORPREFIX + EVENTREPLY): continue try: key = action, ident entry = self.active_requests.pop(key) except KeyError: if action.startswith(ERRORPREFIX): try: key = REQUEST2REPLY[action[len(ERRORPREFIX):]], ident except KeyError: key = None entry = self.active_requests.pop(key, None) else: # this may be a response to the last unknown request key = None entry = self.active_requests.pop(key, None) if entry is None: self._unhandled_message(action, ident, data) continue entry[2] = action, ident, data entry[1].set() # trigger event while not self.pending.empty(): # let the TX thread sort out which entry to treat # this may have bad performance, but happens rarely self.txq.put(self.pending.get()) self._rxthread = None self.disconnect() if self.activate: self.log.info('reconnect to %s', self.uri) mkthread(self._reconnect) else: self.log.warning('%s disconnected', self.uri) self._set_state(False, 'disconnected') def spawn_connect(self, connected_callback=None): """try to connect in background and trigger event when done and event is not None """ self.disconnect_time = time.time() mkthread(self._reconnect, connected_callback) def _reconnect(self, connected_callback=None): while True: try: self.connect() if connected_callback: connected_callback() break except Exception as e: txt = str(e).split('\n', 1)[0] if txt != self._last_error: self._last_error = txt self.log.error(str(e)) if time.time() > self.disconnect_time + self.reconnect_timeout: if self.online: # was recently connected self.disconnect_time = 0 self.log.warning('can not reconnect to %s (%r)' % (self.nodename, e)) # self.log.warning(formatExtendedTraceback()) self._set_state(False, 'disconnected') time.sleep(self.reconnect_timeout) else: time.sleep(1) def disconnect(self): self.shutdown = True self.disconnect_time = time.time() if self._txthread: self.txq.put(None) # shutdownmarker self._txthread.join() self._txthread = None if self._rxthread: self._rxthread.join() self._rxthread = None if self.io: self.io.disconnect() self.io = None # abort pending requests early try: # avoid race condition while self.active_requests: _, (_, event, _) = self.active_requests.popitem() event.set() except KeyError: pass try: while True: _, event, _ = self.pending.get(block=False) event.set() except queue.Empty: pass self.shutdown = False def _init_descriptive_data(self, data): """rebuild descriptive data""" changed_modules = None if json.dumps(data, sort_keys=True) != json.dumps(self.descriptive_data, sort_keys=True): if self.descriptive_data: changed_modules = set() modules = data.get('modules', {}) for modname, moddesc in self.descriptive_data['modules'].items(): if json.dumps(moddesc, sort_keys=True) != json.dumps(modules.get(modname), sort_keys=True): changed_modules.add(modname) self.descriptive_data = data modules = data['modules'] self.modules = {} self.properties = {k: v for k, v in data.items() if k != 'modules'} self.identifier = {} # map (module, parameter) -> identifier self.internal = {} # map identifier -> (module, parameter) for modname, moddescr in modules.items(): # separate accessibles into command and parameters parameters = {} commands = {} accessibles = moddescr['accessibles'] for aname, aentry in accessibles.items(): aentry = dict(aentry, datatype=get_datatype(aentry['datainfo'])) iname = self.internalize_name(aname) ident = '%s:%s' % (modname, aname) self.identifier[modname, iname] = ident self.internal[ident] = modname, iname if aentry['datainfo']['type'] == 'command': commands[iname] = aentry else: parameters[iname] = aentry properties = {k: v for k, v in moddescr.items() if k != 'accessibles'} self.modules[modname] = dict(accessibles=accessibles, parameters=parameters, commands=commands, properties=properties) if changed_modules is not None: done = self.node_callback('descriptiveDataChange', None, self) for mname in changed_modules: if not self.module_callback('descriptiveDataChange', mname, mname, self): self.log.warning('descriptive data changed on module %r', mname) done = True if not done: self.log.warning('descriptive data of %r changed', self.nodename) def register(self, obj=None, module=None, **kwds): """register callback functions - kwds keys must be valid callback name defined in self.CALLBACK_NAMES - kwds names are the callback functions - if obj is not None, use its methods named from the callback name, if not given in kwds - module may be a module name. if not None and not omitted, the registered callback will be called only when it is related to the given module """ for cbname in self.CALLBACK_NAMES: cbfunc = kwds.pop(cbname, None) if obj and cbfunc is None: cbfunc = getattr(obj, cbname, None) if not cbfunc: continue cbdict = self.callbacks[cbname] cbdict[module].append(cbfunc) if cbname == 'updateEvent': if module is None: for (mname, pname), data in self.cache.items(): cbfunc(mname, pname, *data) else: for (mname, pname), data in self.cache.items(): if mname == module: cbfunc(mname, pname, *data) elif cbname == 'nodeStateChange': cbfunc(self.online, self._state) if kwds: raise TypeError('unknown callback: %s' % (', '.join(kwds))) def node_callback(self, cbname, *args): cblist = self.callbacks[cbname].get(None, []) self.callbacks[cbname][None] = [cb for cb in cblist if cb(*args) is not UNREGISTER] return bool(cblist) def module_callback(self, cbname, mname, *args): cblist = self.callbacks[cbname].get(mname, []) self.callbacks[cbname][mname] = [cb for cb in cblist if cb(*args) is not UNREGISTER] return bool(cblist) def _update_value(self, module, param, value, timestamp, readerror): if readerror: assert isinstance(readerror, tuple) if self.validate_data: try: # try to validate, reason: make enum_members from integers datatype = self.modules[module]['parameters'][param]['datatype'] value = datatype(value) except (KeyError, ValueError): pass self.cache[(module, param)] = (value, timestamp, readerror) self.node_callback('updateEvent', module, param, value, timestamp, readerror) self.module_callback('updateEvent', module, module, param, value, timestamp, readerror) def _unhandled_message(self, action, ident, data): mname = None if ident: mname = ident.split(':')[0] done = self.node_callback('unhandledMessage', action, ident, data) done = self.module_callback('unhandledMessage', mname, action, ident, data) or done if not done: self.log.warning('unhandled message: %s %s %r' % (action, ident, data)) def _set_state(self, online, state=None): # treat reconnecting as online! self._state = state or self._state self.online = online self.node_callback('nodeStateChange', self.online, self._state) for mname in self.modules: self.module_callback('nodeStateChange', mname, self.online, self._state) def queue_request(self, action, ident=None, data=None): """make a request""" request = action, ident, data self.connect() # make sure we are connected # the last item is for the reply entry = [request, Event(), None] self.txq.put(entry) return entry def get_reply(self, entry): """wait for reply and return it""" if not entry[1].wait(10): # entry raise TimeoutError('no response within 10s') if not entry[2]: # reply raise ConnectionError('connection closed before reply') action, _, data = entry[2] # pylint: disable=unpacking-non-sequence if action.startswith(ERRORPREFIX): errcls = self.error_map(data[0]) raise errcls(data[1]) return entry[2] # reply def request(self, action, ident=None, data=None): """make a request and wait for reply """ entry = self.queue_request(action, ident, data) return self.get_reply(entry) def getParameter(self, module, parameter, trycache=False): if trycache: cached = self.cache.get((module, parameter), None) if cached: return cached if self.online: try: self.request(READREQUEST, self.identifier[module, parameter]) except secop.errors.SECoPError: # error reply message is already stored as readerror in cache pass return self.cache[module, parameter] def setParameter(self, module, parameter, value): self.connect() # make sure we are connected datatype = self.modules[module]['parameters'][parameter]['datatype'] value = datatype.export_value(datatype.from_string(value)) self.request(WRITEREQUEST, self.identifier[module, parameter], value) return self.cache[module, parameter] def execCommand(self, module, command, argument=None): self.connect() # make sure we are connected datatype = self.modules[module]['commands'][command]['datatype'].argument if datatype: argument = datatype.export_value(datatype.from_string(argument)) else: if argument is not None: raise secop.errors.BadValueError('command has no argument') # pylint: disable=unsubscriptable-object data, qualifiers = self.request(COMMANDREQUEST, self.identifier[module, command], argument)[2] datatype = self.modules[module]['commands'][command]['datatype'].result if datatype: data = datatype.import_value(data) return data, qualifiers # the following attributes may be/are intended to be overwritten by a subclass ERROR_MAP = secop.errors.EXCEPTIONS DEFAULT_EXCEPTION = secop.errors.SECoPError PREDEFINED_NAMES = set(secop.params.PREDEFINED_ACCESSIBLES) activate = True def error_map(self, exc): """how to convert SECoP and unknown exceptions""" return self.ERROR_MAP.get(exc, self.DEFAULT_EXCEPTION) def internalize_name(self, name): """how to create internal names""" if name.startswith('_') and name[1:] not in self.PREDEFINED_NAMES: return name[1:] return name