diff --git a/bin/secop-console b/bin/secop-console index babb778..faaaa08 100755 --- a/bin/secop-console +++ b/bin/secop-console @@ -37,7 +37,7 @@ sys.path[0] = basepath # do not move above! import mlzlog -from secop.client import ClientConsole +from secop.client.console import ClientConsole def parseArgv(argv): diff --git a/secop/client/__init__.py b/secop/client/__init__.py index 0557946..a535b93 100644 --- a/secop/client/__init__.py +++ b/secop/client/__init__.py @@ -17,184 +17,504 @@ # # Module authors: # Enrico Faulhaber +# Markus Zolliker # # ***************************************************************************** -"""Define Client side proxies""" +"""general SECoP client""" -# nothing here yet. +import time +import queue +import json +from threading import Event, RLock +from collections import defaultdict + +from secop.lib import mkthread, formatExtendedTraceback, formatExtendedStack +from secop.lib.asynconn import AsynConn, ConnectionClosed +from secop.datatypes import get_datatype +from secop.protocol.interface import encode_msg_frame, decode_msg +from secop.protocol.messages import REQUEST2REPLY, ERRORPREFIX, EVENTREPLY, WRITEREQUEST, WRITEREPLY, \ + READREQUEST, READREPLY, IDENTREQUEST, IDENTPREFIX, ENABLEEVENTSREQUEST, COMMANDREQUEST, DESCRIPTIONREQUEST +import secop.errors +import secop.params + +# replies to be handled for cache +UPDATE_MESSAGES = {EVENTREPLY, READREPLY, WRITEREPLY, ERRORPREFIX + READREQUEST, ERRORPREFIX + EVENTREPLY} -import code -import socket -import threading -from collections import deque -from os import path +class UNREGISTER: + """a magic value, used a returned value in a callback -try: - import mlzlog -except ImportError: - pass # has to be fixed in case this file is used again - -from secop.protocol.interface import decode_msg, encode_msg_frame, get_msg -from secop.protocol.messages import DESCRIPTIONREQUEST, EVENTREPLY - -try: - import configparser -except ImportError: - import ConfigParser as configparser + to indicate it has to be unregistered + used to implement one shot callbacks + """ +class Logger: + """dummy logger, in case not provided from caller""" -class NameSpace(dict): + @staticmethod + def info(fmt, *args, **kwds): + print(str(fmt) % args) - def __init__(self): - dict.__init__(self) - self.__const = set() - - def setconst(self, name, value): - dict.__setitem__(self, name, value) - self.__const.add(name) - - def __setitem__(self, name, value): - if name in self.__const: - raise RuntimeError('%s cannot be assigned' % name) - dict.__setitem__(self, name, value) - - def __delitem__(self, name): - if name in self.__const: - raise RuntimeError('%s cannot be deleted' % name) - dict.__delitem__(self, name) - - - -def getClientOpts(cfgfile): - parser = configparser.SafeConfigParser() - if not parser.read([cfgfile + '.cfg']): - print("Error reading cfg file %r" % cfgfile) - return {} - if not parser.has_section('client'): - print("No Server section found!") - return dict(item for item in parser.items('client')) - - -class ClientConsole: - - def __init__(self, cfgname, basepath): - self.namespace = NameSpace() - self.namespace.setconst('help', self.helpCmd) - - cfgfile = path.join(basepath, 'etc', cfgname) - cfg = getClientOpts(cfgfile) - self.client = Client(cfg) - self.client.populateNamespace(self.namespace) - - def run(self): - console = code.InteractiveConsole(self.namespace) - console.interact("Welcome to the SECoP console") - - def close(self): + @staticmethod + def noop(fmt, *args, **kwds): pass - def helpCmd(self, arg=Ellipsis): - if arg is Ellipsis: - print("No help available yet") + debug = noop + error = warning = critical = info + + +class CallbackMixin: + """abstract mixin + + this is mainly for documentation, but it might be extended + and used as a mixin for objects registered as a callback + """ + def updateEvent(self, module, parameter, value, timestamp, readerror): + """called whenever a value is changed + + or when new callbacks are registered + """ + + def unhandledMessage(self, action, ident, data): + """called on an unhandled message""" + + def nodeStateChange(self, online, state): + """called when the state of the connection changes + + 'online' is True when connected or reconnecting, False when disconnected or connecting + 'state' is the connection state as a string + """ + + def descriptiveDataChange(self, module, description): + """called when the description has changed + + this callback is called on the node with module=None + and on every changed module with module== + """ + + +class SecopClient: + """a general SECoP client""" + reconnect_timeout = 10 + shutdown = False + _rxthread = None + _txthread = None + _state = 'disconnected' # further possible values: 'connecting', 'reconnecting', 'connected' + online = False # connected or reconnecting since a short time + disconnect_time = 0 # time of last disconnect + secop_version = '' + _rxbuffer = b'' + descriptive_data = {} + CALLBACK_NAMES = 'updateEvent', 'nodeStateChange', 'unhandledMessage', 'descriptiveDataChange', 'handleMessage' + callbacks = {} + modules = {} + _last_error = None + validate_data = False + + def __init__(self, uri, log=Logger): + """like __init__, but called from SecopClient.__new__""" + # maps expected replies to [request, Event, is_error, result] until a response came + # there can only be one entry per thread calling 'request' + self.active_requests = {} + # caches (module, parameter) = value, timestamp, readerror (internal names!) + self.cache = {} + self.io = None + self.callbacks = {cbname: defaultdict(list) for cbname in self.CALLBACK_NAMES} + self.txq = queue.Queue(30) # queue for tx requests + self.pending = queue.Queue(30) # requests with colliding action + ident + self.log = log + self.uri = uri + self.nodename = uri + self._lock = RLock() + + def __del__(self): + try: + self.disconnect() + except Exception: + pass + + def connect(self, try_period=0): + """establish connection + + if a is given, repeat trying for the given time (sec) + """ + with self._lock: + if self.io: + return + if self.online: + self._set_state(True, 'reconnecting') + else: + self._set_state(False, 'connecting') + deadline = time.time() + try_period + while True: + try: + self.io = AsynConn(self.uri) # timeout 1 sec + self.io.writeline(IDENTREQUEST.encode('utf-8')) + reply = self.io.readline(10) + if reply: + self.secop_version = reply.decode('utf-8') + else: + raise self.error_map('HardwareError')('no answer to %s' % IDENTREQUEST) + if not self.secop_version.startswith(IDENTPREFIX): + raise self.error_map('HardwareError')('bad answer to %s: %r' % + (IDENTREQUEST, self.secop_version)) + # now its safe to do secop stuff + self._rxthread = mkthread(self.__rxthread) + self._txthread = mkthread(self.__txthread) + self.log.debug('connected to %s', self.uri) + # pylint: disable=unsubscriptable-object + self._init_descriptive_data(self.request(DESCRIPTIONREQUEST)[2]) + self.nodename = self.properties.get('equipment_id', self.uri) + if self.activate: + self.request(ENABLEEVENTSREQUEST) + self._set_state(True, 'connected') + break + except Exception: + # print(formatExtendedTraceback()) + if time.time() > deadline: + # stay online for now, if activated + self._set_state(self.online and self.activate, 'disconnected') + raise + time.sleep(1) + self.log.info('%s ready', self.nodename) + + def __txthread(self): + while not self.shutdown: + entry = self.txq.get() + if entry is None: + break + request = entry[0] + reply_action = REQUEST2REPLY.get(request[0], None) + if reply_action: + key = (reply_action, request[1]) # action and identifier + else: # allow experimental unknown requests, but only one at a time + key = None + if key in self.active_requests: + # store to requeue after the next reply was received + self.pending.put(entry) + else: + self.active_requests[key] = entry + line = encode_msg_frame(*request) + self.log.debug('TX: %r', line) + self.io.send(line) + self._txthread = None + self.disconnect() + + def __rxthread(self): + while not self.shutdown: + try: + reply = self.io.readline() + if reply is None: + continue + except ConnectionClosed: + break + action, ident, data = decode_msg(reply) + if ident == '.': + ident = None + if action in UPDATE_MESSAGES: + module_param = self.internal.get(ident, None) + if module_param is None and ':' not in ident: + # allow missing ':value'/':target' + if action == WRITEREPLY: + module_param = self.internal.get(ident + ':target', None) + else: + module_param = self.internal.get(ident + ':value', None) + if module_param is not None: + if action.startswith(ERRORPREFIX): + timestamp = data[2].get('t', None) + readerror = tuple(data[0:2]) + value = None + else: + timestamp = data[1].get('t', None) + value = data[0] + readerror = None + module, param = module_param + self._update_value(module, param, value, timestamp, readerror) + if action in (EVENTREPLY, ERRORPREFIX + EVENTREPLY): + continue + try: + key = action, ident + entry = self.active_requests.pop(key) + except KeyError: + if action.startswith(ERRORPREFIX): + try: + key = REQUEST2REPLY[action[len(ERRORPREFIX):]], ident + except KeyError: + key = None + entry = self.active_requests.pop(key, None) + else: + # this may be a response to the last unknown request + key = None + entry = self.active_requests.pop(key, None) + if entry is None: + self._unhandled_message(action, ident, data) + continue + entry[2] = action, ident, data + entry[1].set() # trigger event + while not self.pending.empty(): + # let the TX thread sort out which entry to treat + # this may have bad performance, but happens rarely + self.txq.put(self.pending.get()) + + self._rxthread = None + self.disconnect() + if self.activate: + self.log.info('reconnect to %s', self.uri) + mkthread(self._reconnect) else: - help(arg) + self.log.warning('%s disconnected', self.uri) + self._set_state(False, 'disconnected') + def spawn_connect(self, connected_callback=None): + """try to connect in background -class TCPConnection: + and trigger event when done and event is not None + """ + self.disconnect_time = time.time() + mkthread(self._reconnect, connected_callback) - def __init__(self, connect, port, **kwds): - self.log = mlzlog.log.getChild('connection', False) - self.connection = socket.create_connection((connect, port), 3) - self.queue = deque() - self._rcvdata = '' - self.callbacks = set() - self._thread = threading.Thread(target=self.thread) - self._thread.daemonize = True - self._thread.start() - - def send(self, msg): - self.log.debug("Sending msg %r" % msg) - data = encode_msg_frame(*msg.serialize()) - self.log.debug("raw data: %r" % data) - self.connection.sendall(data) - - def thread(self): + def _reconnect(self, connected_callback=None): while True: try: - self.thread_step() + self.connect() + if connected_callback: + connected_callback() + break except Exception as e: - self.log.exception("Exception in RCV thread: %r" % e) + txt = str(e).split('\n', 1)[0] + if txt != self._last_error: + self._last_error = txt + self.log.error(str(e)) + if time.time() > self.disconnect_time + self.reconnect_timeout: + if self.online: # was recently connected + self.disconnect_time = 0 + self.log.warning('can not reconnect to %s (%r)' % (self.nodename, e)) + # self.log.warning(formatExtendedTraceback()) + self._set_state(False, 'disconnected') + time.sleep(self.reconnect_timeout) + else: + time.sleep(1) - def thread_step(self): - data = b'' - while True: - newdata = self.connection.recv(1024) - self.log.debug("RCV: got raw data %r" % newdata) - data = data + newdata + def disconnect(self): + self.shutdown = True + self.disconnect_time = time.time() + if self._txthread: + self.txq.put(None) # shutdownmarker + self._txthread.join() + self._txthread = None + if self._rxthread: + self._rxthread.join() + self._rxthread = None + if self.io: + self.io.disconnect() + self.io = None + # abort pending requests early + try: # avoid race condition + while self.active_requests: + _, (_, event, _) = self.active_requests.popitem() + event.set() + except KeyError: + pass + try: while True: - origin, data = get_msg(data) - if origin is None: - break # no more messages to process - if not origin: # empty string - continue # ??? - _ = decode_msg(origin) - # construct msgObj from msg - try: - #msgObj = Message(*msg) - #msgObj.origin = origin.decode('latin-1') - #self.handle(msgObj) - pass - except Exception: - # ??? what to do here? - pass + _, event, _ = self.pending.get(block=False) + event.set() + except queue.Empty: + pass + self.shutdown = False - def handle(self, msg): - if msg.action == EVENTREPLY: - self.log.info("got Async: %r" % msg) - for cb in self.callbacks: - try: - cb(msg) - except Exception as e: - self.log.debug( - "handle_async: got exception %r" % e, exception=True) + def _init_descriptive_data(self, data): + """rebuild descriptive data""" + changed_modules = None + if json.dumps(data, sort_keys=True) != json.dumps(self.descriptive_data, sort_keys=True): + if self.descriptive_data: + changed_modules = set() + modules = data.get('modules', {}) + for modname, moddesc in self.descriptive_data['modules'].items(): + if json.dumps(moddesc, sort_keys=True) != json.dumps(modules.get(modname), sort_keys=True): + changed_modules.add(modname) + self.descriptive_data = data + modules = data['modules'] + self.modules = {} + self.properties = {k: v for k, v in data.items() if k != 'modules'} + self.identifier = {} # map (module, parameter) -> identifier + self.internal = {} # map identifier -> (module, parameter) + for modname, moddescr in modules.items(): + # separate accessibles into command and parameters + parameters = {} + commands = {} + accessibles = moddescr['accessibles'] + for aname, aentry in accessibles.items(): + aentry = dict(aentry, datatype=get_datatype(aentry['datainfo'])) + iname = self.internalize_name(aname) + ident = '%s:%s' % (modname, aname) + self.identifier[modname, iname] = ident + self.internal[ident] = modname, iname + if aentry['datainfo']['type'] == 'command': + commands[iname] = aentry + else: + parameters[iname] = aentry + properties = {k: v for k, v in moddescr.items() if k != 'accessibles'} + self.modules[modname] = dict(accessibles=accessibles, parameters=parameters, + commands=commands, properties=properties) + if changed_modules is not None: + done = self.node_callback('descriptiveDataChange', None, self) + for mname in changed_modules: + if not self.module_callback('descriptiveDataChange', mname, mname, self): + self.log.warning('descriptive data changed on module %r', mname) + done = True + if not done: + self.log.warning('descriptive data of %r changed', self.nodename) + + def register(self, obj=None, module=None, **kwds): + """register callback functions + + - kwds keys must be valid callback name defined in self.CALLBACK_NAMES + - kwds names are the callback functions + - if obj is not None, use its methods named from the callback name, if not given in kwds + - module may be a module name. if not None and not omitted, the registered callback will + be called only when it is related to the given module + """ + for cbname in self.CALLBACK_NAMES: + cbfunc = kwds.pop(cbname, None) + if obj and cbfunc is None: + cbfunc = getattr(obj, cbname, None) + if not cbfunc: + continue + cbdict = self.callbacks[cbname] + cbdict[module].append(cbfunc) + if cbname == 'updateEvent': + if module is None: + for (mname, pname), data in self.cache.items(): + cbfunc(mname, pname, *data) + else: + for (mname, pname), data in self.cache.items(): + if mname == module: + cbfunc(mname, pname, *data) + elif cbname == 'nodeStateChange': + cbfunc(self.online, self._state) + if kwds: + raise TypeError('unknown callback: %s' % (', '.join(kwds))) + + def node_callback(self, cbname, *args): + cblist = self.callbacks[cbname].get(None, []) + self.callbacks[cbname][None] = [cb for cb in cblist if cb(*args) is not UNREGISTER] + return bool(cblist) + + def module_callback(self, cbname, mname, *args): + cblist = self.callbacks[cbname].get(mname, []) + self.callbacks[cbname][mname] = [cb for cb in cblist if cb(*args) is not UNREGISTER] + return bool(cblist) + + def _update_value(self, module, param, value, timestamp, readerror): + if readerror: + assert isinstance(readerror, tuple) + if self.validate_data: + try: + # try to validate, reason: make enum_members from integers + datatype = self.modules[module]['parameters'][param]['datatype'] + value = datatype(value) + except (KeyError, ValueError): + pass + self.cache[(module, param)] = (value, timestamp, readerror) + self.node_callback('updateEvent', module, param, value, timestamp, readerror) + self.module_callback('updateEvent', module, module, param, value, timestamp, readerror) + + def _unhandled_message(self, action, ident, data): + mname = None + if ident: + mname = ident.split(':')[0] + done = self.node_callback('unhandledMessage', action, ident, data) + done = self.module_callback('unhandledMessage', mname, action, ident, data) or done + if not done: + self.log.warning('unhandled message: %s %s %r' % (action, ident, data)) + + def _set_state(self, online, state=None): + # treat reconnecting as online! + self._state = state or self._state + self.online = online + self.node_callback('nodeStateChange', self.online, self._state) + for mname in self.modules: + self.module_callback('nodeStateChange', mname, self.online, self._state) + + def queue_request(self, action, ident=None, data=None): + """make a request""" + request = action, ident, data + self.connect() # make sure we are connected + # the last item is for the reply + entry = [request, Event(), None] + self.txq.put(entry) + return entry + + def get_reply(self, entry): + """wait for reply and return it""" + if not entry[1].wait(10): # entry + raise TimeoutError('no response within 10s') + if not entry[2]: # reply + raise ConnectionError('connection closed before reply') + action, _, data = entry[2] # pylint: disable=unpacking-non-sequence + if action.startswith(ERRORPREFIX): + errcls = self.error_map(data[0]) + raise errcls(data[1]) + return entry[2] # reply + + def request(self, action, ident=None, data=None): + """make a request + + and wait for reply + """ + entry = self.queue_request(action, ident, data) + return self.get_reply(entry) + + def getParameter(self, module, parameter, trycache=False): + if trycache: + cached = self.cache.get((module, parameter), None) + if cached: + return cached + if self.online: + try: + self.request(READREQUEST, self.identifier[module, parameter]) + except secop.errors.SECoPError: + # error reply message is already stored as readerror in cache + pass + return self.cache[module, parameter] + + def setParameter(self, module, parameter, value): + self.connect() # make sure we are connected + datatype = self.modules[module]['parameters'][parameter]['datatype'] + value = datatype.export_value(datatype.from_string(value)) + self.request(WRITEREQUEST, self.identifier[module, parameter], value) + return self.cache[module, parameter] + + def execCommand(self, module, command, argument=None): + self.connect() # make sure we are connected + datatype = self.modules[module]['commands'][command]['datatype'].argument + if datatype: + argument = datatype.export_value(datatype.from_string(argument)) else: - self.queue.append(msg) + if argument is not None: + raise secop.errors.BadValueError('command has no argument') + # pylint: disable=unsubscriptable-object + data, qualifiers = self.request(COMMANDREQUEST, self.identifier[module, command], argument)[2] + datatype = self.modules[module]['commands'][command]['datatype'].result + if datatype: + data = datatype.import_value(data) + return data, qualifiers - def read(self): - while not self.queue: - pass # XXX: remove BUSY polling - return self.queue.popleft() + # the following attributes may be/are intended to be overwritten by a subclass - def register_callback(self, callback): - """registers callback for async data""" - self.callbacks.add(callback) + ERROR_MAP = secop.errors.EXCEPTIONS + DEFAULT_EXCEPTION = secop.errors.SECoPError + PREDEFINED_NAMES = set(secop.params.PREDEFINED_ACCESSIBLES) + activate = True - def unregister_callback(self, callback): - """unregisters callback for async data""" - self.callbacks.discard(callback) + def error_map(self, exc): + """how to convert SECoP and unknown exceptions""" + return self.ERROR_MAP.get(exc, self.DEFAULT_EXCEPTION) - -class Client: - - def __init__(self, opts): - self.log = mlzlog.log.getChild('client', True) - self._cache = dict() - self.connection = TCPConnection(**opts) - self.connection.register_callback(self.handle_async) - - def handle_async(self, msg): - self.log.info("Got async update %r" % msg) - module = msg.module - param = msg.param - value = msg.value - self._cache.getdefault(module, {})[param] = value - # XXX: further notification-callbacks needed ??? - - def populateNamespace(self, namespace): - #self.connection.send(Message(DESCRIPTIONREQUEST)) - # reply = self.connection.read() - # self.log.info("found modules %r" % reply) - # create proxies, populate cache.... - namespace.setconst('connection', self.connection) + def internalize_name(self, name): + """how to create internal names""" + if name.startswith('_') and name[1:] not in self.PREDEFINED_NAMES: + return name[1:] + return name diff --git a/secop/client/console.py b/secop/client/console.py new file mode 100644 index 0000000..df3d815 --- /dev/null +++ b/secop/client/console.py @@ -0,0 +1,193 @@ +# -*- coding: utf-8 -*- +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Enrico Faulhaber +# +# ***************************************************************************** +"""console client""" + +# this needs to be reworked or removed + + +import code +import socket +import threading +from collections import deque +from os import path +import configparser +import mlzlog + +from secop.protocol.interface import decode_msg, encode_msg_frame, get_msg +from secop.protocol.messages import EVENTREPLY + + + +class NameSpace(dict): + + def __init__(self): + dict.__init__(self) + self.__const = set() + + def setconst(self, name, value): + dict.__setitem__(self, name, value) + self.__const.add(name) + + def __setitem__(self, name, value): + if name in self.__const: + raise RuntimeError('%s cannot be assigned' % name) + dict.__setitem__(self, name, value) + + def __delitem__(self, name): + if name in self.__const: + raise RuntimeError('%s cannot be deleted' % name) + dict.__delitem__(self, name) + + + +def getClientOpts(cfgfile): + parser = configparser.SafeConfigParser() + if not parser.read([cfgfile + '.cfg']): + print("Error reading cfg file %r" % cfgfile) + return {} + if not parser.has_section('client'): + print("No Server section found!") + return dict(item for item in parser.items('client')) + + +class ClientConsole: + + def __init__(self, cfgname, basepath): + self.namespace = NameSpace() + self.namespace.setconst('help', self.helpCmd) + + cfgfile = path.join(basepath, 'etc', cfgname) + cfg = getClientOpts(cfgfile) + self.client = Client(cfg) + self.client.populateNamespace(self.namespace) + + def run(self): + console = code.InteractiveConsole(self.namespace) + console.interact("Welcome to the SECoP console") + + def close(self): + pass + + def helpCmd(self, arg=Ellipsis): + if arg is Ellipsis: + print("No help available yet") + else: + help(arg) + + +class TCPConnection: + + def __init__(self, connect, port, **kwds): + self.log = mlzlog.log.getChild('connection', False) + port = int(port) + self.connection = socket.create_connection((connect, port), 3) + self.queue = deque() + self._rcvdata = '' + self.callbacks = set() + self._thread = threading.Thread(target=self.thread) + self._thread.daemonize = True + self._thread.start() + + def send(self, msg): + self.log.debug("Sending msg %r" % msg) + data = encode_msg_frame(*msg.serialize()) + self.log.debug("raw data: %r" % data) + self.connection.sendall(data) + + def thread(self): + while True: + try: + self.thread_step() + except Exception as e: + self.log.exception("Exception in RCV thread: %r" % e) + + def thread_step(self): + data = b'' + while True: + newdata = self.connection.recv(1024) + self.log.debug("RCV: got raw data %r" % newdata) + data = data + newdata + while True: + origin, data = get_msg(data) + if origin is None: + break # no more messages to process + if not origin: # empty string + continue # ??? + _ = decode_msg(origin) + # construct msgObj from msg + try: + #msgObj = Message(*msg) + #msgObj.origin = origin.decode('latin-1') + #self.handle(msgObj) + pass + except Exception: + # ??? what to do here? + pass + + def handle(self, msg): + if msg.action == EVENTREPLY: + self.log.info("got Async: %r" % msg) + for cb in self.callbacks: + try: + cb(msg) + except Exception as e: + self.log.debug( + "handle_async: got exception %r" % e, exception=True) + else: + self.queue.append(msg) + + def read(self): + while not self.queue: + pass # XXX: remove BUSY polling + return self.queue.popleft() + + def register_callback(self, callback): + """registers callback for async data""" + self.callbacks.add(callback) + + def unregister_callback(self, callback): + """unregisters callback for async data""" + self.callbacks.discard(callback) + + +class Client: + + def __init__(self, opts): + self.log = mlzlog.log.getChild('client', True) + self._cache = dict() + self.connection = TCPConnection(**opts) + self.connection.register_callback(self.handle_async) + + def handle_async(self, msg): + self.log.info("Got async update %r" % msg) + module = msg.module + param = msg.param + value = msg.value + self._cache.getdefault(module, {})[param] = value + # XXX: further notification-callbacks needed ??? + + def populateNamespace(self, namespace): + #self.connection.send(Message(DESCRIPTIONREQUEST)) + # reply = self.connection.read() + # self.log.info("found modules %r" % reply) + # create proxies, populate cache.... + namespace.setconst('connection', self.connection) diff --git a/secop/lib/asynconn.py b/secop/lib/asynconn.py new file mode 100644 index 0000000..d7c3e26 --- /dev/null +++ b/secop/lib/asynconn.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Enrico Faulhaber +# Markus Zolliker +# +# ***************************************************************************** + +"""asynchonous connections + +generic class for byte oriented communication +includes implementation for TCP connections +""" + +import socket +import time + +from secop.lib import parseHostPort, tcpSocket, closeSocket + + +class ConnectionClosed(ConnectionError): + pass + + +class AsynConn: + timeout = 1 # inter byte timeout + SCHEME_MAP = {} + connection = None # is not None, if connected + defaultport = None + + def __new__(cls, uri): + scheme = uri.split('://')[0] + iocls = cls.SCHEME_MAP.get(scheme, None) + if not iocls: + # try tcp, if scheme not given + try: + host_port = parseHostPort(uri, cls.defaultport) + except (ValueError, TypeError, AssertionError): + raise ValueError('invalid uri: %s' % uri) + iocls = cls.SCHEME_MAP['tcp'] + uri = 'tcp://%s:%d' % host_port + return object.__new__(iocls) + + def __init__(self, *args): + self._rxbuffer = b'' + + def __del__(self): + self.disconnect() + + @classmethod + def register_scheme(cls, scheme): + cls.SCHEME_MAP[scheme] = cls + + def disconnect(self): + raise NotImplementedError + + def send(self, data): + """send data (bytes!) + + tries to send all data""" + raise NotImplementedError + + def recv(self): + """return bytes received within timeout + + in contrast to socket.recv: + - returns b'' on timeout + - raises ConnectionClosed if the other end has disconnected + """ + raise NotImplementedError + + def readline(self, timeout=None): + """read one line + + return either a complete line or None in case of timeout + the timeout argument may increase, but not decrease the default timeout + """ + if timeout: + end = time.time() + timeout + while b'\n' not in self._rxbuffer: + data = self.recv() + if not data: + if timeout: + if time.time() < end: + continue + raise TimeoutError('timeout in readline') + return None + self._rxbuffer += data + line, self._rxbuffer = self._rxbuffer.split(b'\n', 1) + return line + + def writeline(self, line): + self.send(line + b'\n') + + +class AsynTcp(AsynConn): + def __init__(self, uri): + super().__init__() + self.uri = uri + if uri.startswith('tcp://'): + # should be the case always + uri = uri[6:] + self.connection = tcpSocket(uri, self.defaultport, self.timeout) + + def disconnect(self): + if self.connection: + closeSocket(self.connection) + self.connection = None + + def send(self, data): + """send data (bytes!)""" + self.connection.sendall(data) + + def recv(self): + """return bytes received within 1 sec""" + try: + data = self.connection.recv(8192) + if data: + return data + except socket.timeout: + # timeout while waiting + return b'' + raise ConnectionClosed() # marks end of connection + +AsynTcp.register_scheme('tcp') diff --git a/secop/protocol/interface/tcp.py b/secop/protocol/interface/tcp.py index 8ea5380..615225d 100644 --- a/secop/protocol/interface/tcp.py +++ b/secop/protocol/interface/tcp.py @@ -38,10 +38,7 @@ from secop.protocol.messages import ERRORPREFIX, \ DEF_PORT = 10767 MESSAGE_READ_SIZE = 1024 - -CR = b'\r' -SPACE = b' ' - +HELP = HELPREQUEST.encode() class OutputBufferOverflow(Exception): pass @@ -116,9 +113,12 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): if origin is None: break # no more messages to process origin = origin.strip() - if origin in (HELPREQUEST, ''): # empty string -> send help message + if origin in (HELP, b''): # empty string -> send help message for idx, line in enumerate(HelpMessage.splitlines()): - self.queue_async_reply((HELPREPLY, '%d' % (idx+1), line)) + # not sending HELPREPLY here, as there should be only one reply for every request + self.queue_async_reply(('_', '%d' % (idx+1), line)) + # ident matches request + self.queue_async_reply((HELPREPLY, None, None)) continue try: msg = decode_msg(origin) diff --git a/secop/protocol/messages.py b/secop/protocol/messages.py index 561d859..8f9dcba 100644 --- a/secop/protocol/messages.py +++ b/secop/protocol/messages.py @@ -25,7 +25,8 @@ IDENTREQUEST = '*IDN?' # literal # literal! first part is fixed! -IDENTREPLY = 'SINE2020&ISSE,SECoP,V2019-08-20,v1.0 RC2' +IDENTPREFIX = 'SINE2020&ISSE,SECoP,' +IDENTREPLY = IDENTPREFIX + 'V2019-08-20,v1.0 RC2' DESCRIPTIONREQUEST = 'describe' # literal DESCRIPTIONREPLY = 'describing' # + +json @@ -65,8 +66,8 @@ HELPREQUEST = 'help' # literal HELPREPLY = 'helping' # +line number +json_text # helper mapping to find the REPLY for a REQUEST +# do not put IDENTREQUEST/IDENTREPLY here, as this needs anyway extra treatment REQUEST2REPLY = { - IDENTREQUEST: IDENTREPLY, DESCRIPTIONREQUEST: DESCRIPTIONREPLY, ENABLEEVENTSREQUEST: ENABLEEVENTSREPLY, DISABLEEVENTSREQUEST: DISABLEEVENTSREPLY,