From 7320ac153897fc646426385fb077196ea1832f28 Mon Sep 17 00:00:00 2001 From: Enrico Faulhaber Date: Wed, 14 Dec 2016 18:26:37 +0100 Subject: [PATCH] Provide basic client Object also improve the describing data and core params Change-Id: I645444f2a618fdfd40a729e1007c58def24d5ffb --- etc/test.cfg | 2 +- secop/client/__init__.py | 8 +- secop/client/baseclient.py | 350 +++++++++++++++++++++++++++++ secop/devices/core.py | 16 ++ secop/lib/__init__.py | 8 + secop/lib/parsing.py | 103 ++++++++- secop/protocol/dispatcher.py | 39 +++- secop/protocol/encoding/demo_v4.py | 11 +- 8 files changed, 509 insertions(+), 28 deletions(-) create mode 100644 secop/client/baseclient.py diff --git a/etc/test.cfg b/etc/test.cfg index 2786898..4a8c7d4 100644 --- a/etc/test.cfg +++ b/etc/test.cfg @@ -2,7 +2,7 @@ id=Fancy_ID_without_spaces-like:MLZ_furnace7 [client] -connect=0.0.0.0 +connectto=0.0.0.0 port=10767 interface = tcp framing=eol diff --git a/secop/client/__init__.py b/secop/client/__init__.py index 7b8d59f..0876181 100644 --- a/secop/client/__init__.py +++ b/secop/client/__init__.py @@ -89,11 +89,12 @@ class ClientConsole(object): else: help(arg) -import loggers import socket import threading from collections import deque -from secop.protocol.transport import FRAMERS, ENCODERS +from secop import loggers +from secop.protocol.encoding import ENCODERS +from secop.protocol.framing import FRAMERS from secop.protocol.messages import * @@ -165,9 +166,6 @@ class TCPConnection(object): self.callbacks.discard(callback) -import loggers - - class Client(object): def __init__(self, opts): diff --git a/secop/client/baseclient.py b/secop/client/baseclient.py new file mode 100644 index 0000000..617a2ff --- /dev/null +++ b/secop/client/baseclient.py @@ -0,0 +1,350 @@ +# -*- coding: utf-8 -*- +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Enrico Faulhaber +# +# ***************************************************************************** + +"""Define Client side proxies""" + +import json +import socket +import serial +import threading +import Queue + +from secop import loggers +from secop.lib import mkthread +from secop.lib.parsing import parse_time, format_time +from secop.protocol.encoding import ENCODERS +from secop.protocol.framing import FRAMERS +from secop.protocol.messages import * + + + +class TCPConnection(object): + # disguise a TCP connection as serial one + def __init__(self, host, port): + self._host = host + self._port = int(port) + self._thread = None + self.connect() + + def connect(self): + self._readbuffer = Queue.Queue(100) + io = socket.create_connection((self._host, self._port)) + io.setblocking(False) + io.settimeout(0.3) + self._io = io + if self._thread and self._thread.is_alive(): + return + self._thread = mkthread(self._run) + + def _run(self): + try: + data = u'' + while True: + try: + newdata = self._io.recv(1024) + except socket.timeout: + newdata = u'' + pass + except Exception as err: + print err, "reconnecting" + self.connect() + data = u'' + continue + data += newdata + while '\n' in data: + line, data = data.split('\n', 1) + try: + self._readbuffer.put(line.strip('\r'), block=True, timeout=1) + except Queue.Full: + self.log.debug('rcv queue full! dropping line: %r' % line) + finally: + self._thread = None + + def readline(self, block=False): + """blocks until a full line was read and returns it""" + i = 10; + while i: + try: + return self._readbuffer.get(block=True, timeout=1) + except Queue.Empty: + continue + if not block: + i -= 1 + + def readable(self): + return not self._readbuffer.empty() + + def write(self, data): + self._io.sendall(data) + + def writeline(self, line): + self.write(line + '\n') + + def writelines(self, *lines): + for line in lines: + self.writeline(line) + + +class Value(object): + t = None + u = None + e = None + fmtstr = '%s' + + def __init__(self, value, qualifiers={}): + self.value = value + if 't' in qualifiers: + self.t = parse_time(qualifiers.pop('t')) + self.__dict__.update(qualifiers) + + def __repr__(self): + r = [] + if self.t is not None: + r.append("timestamp=%r" % format_time(self.t)) + if self.u is not None: + r.append('unit=%r' % self.u) + if self.e is not None: + r.append(('error=%s' % self.fmtstr) % self.e) + if r: + return (self.fmtstr + '(%s)') % (self.value, ', '.join(r)) + return self.fmtstr % self.value + + +class Client(object): + equipmentId = 'unknown' + secop_id = 'unknown' + describing_data = {} + stopflag = False + + def __init__(self, opts): + self.log = loggers.log.getChild('client', True) + self._cache = dict() + if 'device' in opts: + # serial port + devport = opts.pop('device') + baudrate = int(opts.pop('baudrate', 115200)) + self.contactPoint = "serial://%s:%s" % (devport, baudrate) + self.connection = serial.Serial(devport, baudrate=baudrate, + timeout=1) + else: + host = opts.pop('connectto', 'localhost') + port = int(opts.pop('port', 10767)) + self.contactPoint = "tcp://%s:%d" % (host, port) + self.connection = TCPConnection(host, port) + # maps an expected reply to an list containing a single Event() + # upon rcv of that reply, the event is set and the listitem 0 is + # appended with the reply-tuple + self.expected_replies = {} + + # maps spec to a set of callback functions (or single_shot callbacks) + self.callbacks = dict() + self.single_shots = dict() + + # mapping the modulename to a dict mapping the parameter names to their values + # note: the module value is stored as the value of the parameter value of the module + self.cache = dict() + + self._syncLock = threading.RLock() + self._thread = threading.Thread(target=self._run) + self._thread.daemon = True + self._thread.start() + + def _run(self): + while not self.stopflag: + try: + self._inner_run() + except Exception as err: + self.log.exception(err) + raise + + def _inner_run(self): + data = '' + self.connection.writeline('*IDN?') + idstring = self.connection.readline() + self.log.info('connected to: ' + idstring.strip()) + + while not self.stopflag: + line = self.connection.readline() + if line.startswith(('SECoP', 'Sine2020WP7')): + self.secop_id = line + continue + msgtype, spec, data = self._decode_message(line) + if msgtype in ('event','changed'): + # handle async stuff + self._handle_event(spec, data) + if msgtype != 'event': + # handle sync stuff + if msgtype == "ERROR" or msgtype in self.expected_replies: + # XXX: make an assignment of ERROR to an expected reply. + entry = self.expected_replies[msgtype] + entry.extend([spec, data]) + # wake up calling process + entry[0].set() + else: + self.log.error('ignoring unexpected reply %r' % line) + + + def _encode_message(self, requesttype, spec='', data=Ellipsis): + """encodes the given message to a string + """ + req = [str(requesttype)] + if spec: + req.append(str(spec)) + if data is not Ellipsis: + req.append(json.dumps(data)) + req = ' '.join(req) + return req + + def _decode_message(self, msg): + """return a decoded message tripel""" + msg = msg.strip() + if ' ' not in msg: + return msg, None, None + msgtype, spec = msg.split(' ', 1) + data = None + if ' ' in spec: + spec, json_data = spec.split(' ', 1) + try: + data = json.loads(json_data) + except ValueError: + # keep as string + data = json_data + return msgtype, spec, data + + def _handle_event(self, spec, data): + """handles event""" + self.log.info('handle_event %r %r' % (spec, data)) + if ':' not in spec: + self.log.warning("deprecated specifier %r" % spec) + spec = '%s:value' % spec + modname, pname = spec.split(':', 1) + self.cache.setdefault(modname, {})[pname] = Value(*data) + if spec in self.callbacks: + for func in self.callbacks[spec]: + try: + mkthread(func, modname, pname, data) + except Exception as err: + self.log.exception('Exception in Callback!', err) + run = set() + if spec in self.single_shots: + for func in self.single_shots[spec]: + try: + mkthread(func, data) + except Exception as err: + self.log.exception('Exception in Single-shot Callback!', err) + run.add(func) + self.single_shots[spec].difference_update(run) + + def register_callback(self, module, parameter, cb): + self.log.debug('registering callback %r for %s:%s' % (cb, module, parameter)) + self.callbacks.setdefault('%s:%s' % (module, parameter), set()).add(cb) + + def unregister_callback(self, module, parameter, cb): + self.log.debug('unregistering callback %r for %s:%s' % (cb, module, parameter)) + self.callbacks.setdefault('%s:%s' % (module, parameter), set()).discard(cb) + + def communicate(self, msgtype, spec='', data=Ellipsis): + # maps each (sync) request to the corresponding reply + # XXX: should go to the encoder! and be imported here (or make a translating method) + REPLYMAP = { + "describe": "describing", + "do": "done", + "change": "changed", + "activate": "active", + "deactivate": "inactive", + "*IDN?": "SECoP,", + "ping": "ping", + } + if self.stopflag: + raise RuntimeError('alreading stopping!') + if msgtype == 'poll': + # send a poll request and then check incoming events + if ':' not in spec: + spec = spec + ':value' + event = threading.Event() + result = ['polled', spec] + self.single_shots.setdefault(spec, set()).add(lambda d: (result.append(d), event.set())) + self.connection.writeline(self._encode_message(msgtype, spec, data)) + if event.wait(10): + return tuple(result) + raise RuntimeError("timeout upon waiting for reply!") + + rply = REPLYMAP[msgtype] + if rply in self.expected_replies: + raise RuntimeError("can not have more than one requests of the same type at the same time!") + event = threading.Event() + self.expected_replies[rply] = [event] + self.connection.writeline(self._encode_message(msgtype, spec, data)) + if event.wait(10): # wait 10s for reply + result = rply, self.expected_replies[rply][1], self.expected_replies[rply][2] + del self.expected_replies[rply] + return result + del self.expected_replies[rply] + raise RuntimeError("timeout upon waiting for reply!") + + def quit(self): + # after calling this the client is dysfunctional! + self.communicate('deactivate') + self.stopflag = True + if self._thread and self._thread.is_alive(): + self.thread.join(self._thread) + + def handle_async(self, msg): + self.log.info("Got async update %r" % msg) + device = msg.device + param = msg.param + value = msg.value + self._cache.getdefault(device, {})[param] = value + # XXX: further notification-callbacks needed ??? + + + def startup(self, async=False): + _, self.equipment_id, self.describing_data = self.communicate('describe') + # always fill our cache + self.communicate('activate') + # deactivate updates if not wanted + if not async: + self.communicate('deactivate') + + @property + def protocolVersion(self): + return self.secop_id + + @property + def modules(self): + return self.describing_data['modules'].keys() + + def getParameters(self, module): + return self.describing_data['modules'][module]['parameters'].keys() + + def getModuleBaseClass(self, module): + return self.describing_data['modules'][module]['baseclass'] + + def getCommands(self, module): + return self.describing_data['modules'][module]['commands'].keys() + + def getProperties(self, module, parameter): + return self.describing_data['modules'][module]['parameters'][parameter].items() + + def syncCommunicate(self, msg): + return self.communicate(msg) + diff --git a/secop/devices/core.py b/secop/devices/core.py index 138c41e..3ae83ca 100644 --- a/secop/devices/core.py +++ b/secop/devices/core.py @@ -67,6 +67,16 @@ class PARAM(object): return '%s(%s)' % (self.__class__.__name__, ', '.join( ['%s=%r' % (k, v) for k, v in sorted(self.__dict__.items())])) + def as_dict(self): + # used for serialisation only + return dict(description = self.description, + unit = self.unit, + readonly = self.readonly, + value = self.value, + timestamp = self.timestamp, + validator = repr(self.validator), + ) + # storage for CMDs settings (description + call signature...) class CMD(object): @@ -83,6 +93,12 @@ class CMD(object): return '%s(%s)' % (self.__class__.__name__, ', '.join( ['%s=%r' % (k, v) for k, v in sorted(self.__dict__.items())])) + def as_dict(self): + # used for serialisation only + return dict(description = self.description, + arguments = repr(self.arguments), + resulttype = repr(self.resulttype), + ) # Meta class # warning: MAGIC! diff --git a/secop/lib/__init__.py b/secop/lib/__init__.py index f84fc80..2a81cb4 100644 --- a/secop/lib/__init__.py +++ b/secop/lib/__init__.py @@ -22,6 +22,7 @@ """Define helpers""" +import threading class attrdict(dict): """a normal dict, providing access also via attributes""" @@ -52,6 +53,13 @@ def get_class(spec): return getattr(module, classname) +def mkthread(func, *args, **kwds): + t = threading.Thread(name='%s:%s' % (func.__module__, func.__name__), + target=func, args=args, kwargs=kwds) + t.daemon = True + t.start() + return t + if __name__ == '__main__': print "minimal testing: lib" d = attrdict(a=1, b=2) diff --git a/secop/lib/parsing.py b/secop/lib/parsing.py index 3f2127c..9f33fae 100644 --- a/secop/lib/parsing.py +++ b/secop/lib/parsing.py @@ -22,19 +22,106 @@ """Define parsing helpers""" +import re import time -import datetime +from datetime import tzinfo, timedelta, datetime + +# based on http://stackoverflow.com/a/39418771 +class LocalTimezone(tzinfo): + ZERO = timedelta(0) + STDOFFSET = timedelta(seconds = -time.timezone) + if time.daylight: + DSTOFFSET = timedelta(seconds = -time.altzone) + else: + DSTOFFSET = STDOFFSET + + DSTDIFF = DSTOFFSET - STDOFFSET + + def utcoffset(self, dt): + if self._isdst(dt): + return self.DSTOFFSET + else: + return self.STDOFFSET + + def dst(self, dt): + if self._isdst(dt): + return self.DSTDIFF + else: + return self.ZERO + + def tzname(self, dt): + return time.tzname[self._isdst(dt)] + + def _isdst(self, dt): + tt = (dt.year, dt.month, dt.day, + dt.hour, dt.minute, dt.second, + dt.weekday(), 0, 0) + stamp = time.mktime(tt) + tt = time.localtime(stamp) + return tt.tm_isdst > 0 + +LocalTimezone = LocalTimezone() + +def format_time(timestamp=None): + # get time in UTC + if timestamp is None: + d = datetime.now(LocalTimezone) + else: + d = datetime.fromtimestamp(timestamp, LocalTimezone) + return d.isoformat("T") -def format_time(timestamp): - return datetime.datetime.fromtimestamp( - timestamp).strftime("%Y-%m-%d %H:%M:%S.%f") +class Timezone(tzinfo): + def __init__(self, offset, name='unknown timezone'): + self.offset = offset + self.name = name + def tzname(self, dt): + return self.name + def utcoffset(self, dt): + return self.offset + def dst(self, dt): + return timedelta(0) +datetime_re = re.compile( + r'(?P\d{4})-(?P\d{1,2})-(?P\d{1,2})' + r'[T ](?P\d{1,2}):(?P\d{1,2})' + r'(?::(?P\d{1,2})(?:\.(?P\d{1,6})\d*)?)?' + r'(?PZ|[+-]\d{2}(?::?\d{2})?)?$' +) + +def _parse_isostring(isostring): + """Parses a string and return a datetime.datetime. + This function supports time zone offsets. When the input contains one, + the output uses a timezone with a fixed offset from UTC. + """ + match = datetime_re.match(isostring) + if match: + kw = match.groupdict() + if kw['microsecond']: + kw['microsecond'] = kw['microsecond'].ljust(6, '0') + _tzinfo = kw.pop('tzinfo') + if _tzinfo == 'Z': + _tzinfo = timezone.utc + elif _tzinfo is not None: + offset_mins = int(_tzinfo[-2:]) if len(_tzinfo) > 3 else 0 + offset_hours = int(_tzinfo[1:3]) + offset = timedelta(hours=offset_hours, minutes=offset_mins) + if _tzinfo[0] == '-': + offset = -offset + _tzinfo = Timezone(offset) + kw = {k: int(v) for k, v in kw.items() if v is not None} + kw['tzinfo'] = _tzinfo + return datetime(**kw) + raise ValueError("%s is not a valid ISO8601 string I can parse!" % isostring) + +def parse_time(isostring): + try: + return float(isostring) + except ValueError: + dt = _parse_isostring(isostring) + return time.mktime(dt.timetuple()) + dt.microsecond * 1e-6 -def parse_time(string): - d = datetime.datetime.strptime(string, "%Y-%m-%d %H:%M:%S.%f") - return time.mktime(d.timetuple()) + 0.000001 * d.microsecond - +# possibly unusable stuff below! def format_args(args): if isinstance(args, list): diff --git a/secop/protocol/dispatcher.py b/secop/protocol/dispatcher.py index c4ea9a1..e2864e7 100644 --- a/secop/protocol/dispatcher.py +++ b/secop/protocol/dispatcher.py @@ -116,7 +116,7 @@ class Dispatcher(object): listeners += list(self._dispatcher_active_connections) for conn in listeners: conn.queue_async_reply(msg) - + def announce_update(self, moduleobj, pname, pobj): """called by modules param setters to notify subscribers of new values """ @@ -142,7 +142,7 @@ class Dispatcher(object): self._dispatcher_connections.remove(conn) for _evt, conns in self._dispatcher_subscriptions.items(): conns.discard(conn) - + def activate_connection(self, conn): self._dispatcher_active_connections.add(conn) @@ -188,6 +188,22 @@ class Dispatcher(object): dd[modulename] = descriptive_data return dn, dd + def get_descriptive_data(self): + # XXX: be lazy and cache this? + result = {} + for modulename in self._dispatcher_export: + module = self.get_module(modulename) + dd = {'class' : module.__class__.__name__, + 'bases' : [b.__name__ for b in module.__class__.__bases__], + 'parameters': dict((pn,po.as_dict()) for pn,po in module.PARAMS.items()), + 'commands': dict((cn,co.as_dict()) for cn,co in module.CMDS.items()), + 'baseclass' : 'Readable', + } + result.setdefault('modules', {})[modulename] = dd + result['equipment_id'] = self.equipment_id + # XXX: what else? + return result + def list_module_params(self, modulename): self.log.debug('list_module_params(%r)' % modulename) if modulename in self._dispatcher_export: @@ -205,7 +221,7 @@ class Dispatcher(object): def _execute_command(self, modulename, command, arguments=None): if arguments is None: arguments = [] - + moduleobj = self.get_module(modulename) if moduleobj is None: raise NoSuchmoduleError(module=modulename) @@ -273,8 +289,7 @@ class Dispatcher(object): def handle_Describe(self, conn, msg): # XXX:collect descriptive data - # XXX:how to get equipment_id? - return DescribeReply(equipment_id = self.equipment_id, description = self.list_modules()) + return DescribeReply(equipment_id = self.equipment_id, description = self.get_descriptive_data()) def handle_Poll(self, conn, msg): # XXX: trigger polling and force sending event @@ -283,7 +298,7 @@ class Dispatcher(object): if conn in self._dispatcher_active_connections: return None # already send to myself return res # send reply to inactive conns - + def handle_Write(self, conn, msg): # notify all by sending WriteReply #msg1 = WriteReply(**msg.as_dict()) @@ -301,7 +316,7 @@ class Dispatcher(object): if conn in self._dispatcher_active_connections: return None # already send to myself return res # send reply to inactive conns - + def handle_Command(self, conn, msg): # notify all by sending CommandReply #msg1 = CommandReply(**msg.as_dict()) @@ -314,7 +329,7 @@ class Dispatcher(object): #if conn in self._dispatcher_active_connections: # return None # already send to myself return res # send reply to inactive conns - + def handle_Heartbeat(self, conn, msg): return HeartbeatReply(**msg.as_dict()) @@ -331,14 +346,14 @@ class Dispatcher(object): except SECOPError as e: self.log.error('decide what to do here!') self.log.exception(e) - res = Value(module=modulename, parameter=pname, - value=pobj.value, t=pobj.timestamp, + res = Value(module=modulename, parameter=pname, + value=pobj.value, t=pobj.timestamp, unit=pobj.unit) if res.value != Ellipsis: # means we do not have a value at all so skip this self.broadcast_event(res) conn.queue_async_reply(ActivateReply(**msg.as_dict())) return None - + def handle_Deactivate(self, conn, msg): self.deactivate_connection(conn) conn.queue_async_reply(DeactivateReply(**msg.as_dict())) @@ -353,7 +368,7 @@ class Dispatcher(object): (no handle_ method was defined) """ self.log.error('IGN: got unhandled request %s' % msg) - return ErrorMessage(errorclass="InternalError", + return ErrorMessage(errorclass="InternalError", errorstring = 'Got Unhandled Request %r' % msg) diff --git a/secop/protocol/encoding/demo_v4.py b/secop/protocol/encoding/demo_v4.py index 72fa03a..efc3ec2 100644 --- a/secop/protocol/encoding/demo_v4.py +++ b/secop/protocol/encoding/demo_v4.py @@ -25,6 +25,7 @@ # implement as class as they may need some internal 'state' later on # (think compressors) +from secop.lib.parsing import format_time from secop.protocol.encoding import MessageEncoder from secop.protocol.messages import * from secop.protocol.errors import ProtocollError @@ -42,7 +43,7 @@ DEMO_RE = re.compile( #""" # messagetypes: IDENTREQUEST = '*IDN?' # literal -IDENTREPLY = 'Sine2020WP7.1&ISSE, SECoP, V2016-11-30, rc1' # literal +IDENTREPLY = 'SECoP, SECoPTCP, V2016-11-30, rc1' # literal! first part 'SECoP' is fixed! DESCRIPTIONSREQUEST = 'describe' # literal DESCRIPTIONREPLY = 'describing' # + +json ENABLEEVENTSREQUEST = 'activate' # literal @@ -66,6 +67,12 @@ ERRORCLASSES = ['NoSuchDevice', 'NoSuchParameter', 'NoSuchCommand', 'CommandRunning', 'Disabled',] # note: above strings need to be unique in the sense, that none is/or starts with another +def encode_value_data(vobj): + q = vobj.qualifiers.copy() + if 't' in q: + q['t'] = format_time(q['t']) + return vobj.value, q + class DemoEncoder(MessageEncoder): # map of msg to msgtype string as defined above. ENCODEMAP = { @@ -88,7 +95,7 @@ class DemoEncoder(MessageEncoder): ErrorMessage : (ERRORREPLY, 'errorclass', 'errorinfo',), Value: (EVENT, lambda msg: "%s:%s" % (msg.module, msg.parameter or (msg.command+'()')) if msg.parameter or msg.command else msg.module, - lambda msg: [msg.value, msg.qualifiers] if msg.qualifiers else [msg.value]), + encode_value_data,), } DECODEMAP = { IDENTREQUEST : lambda spec, data: IdentifyRequest(),