polishing for a demo

+ adopting additional requests

Change-Id: If5ca29b5d247f1bc429ca101b0081b1d14f6e6f1
This commit is contained in:
Enrico Faulhaber
2017-01-25 11:47:19 +01:00
parent d5e935788f
commit 6ec30e38e8
43 changed files with 828 additions and 578 deletions

View File

@ -19,12 +19,12 @@
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
#
# *****************************************************************************
"""Define Client side proxies"""
import json
import socket
import serial
from select import select
import threading
import Queue
@ -45,6 +45,7 @@ class TCPConnection(object):
self._host = host
self._port = int(port)
self._thread = None
self.callbacks = [] # called if SEC-node shuts down
self.connect()
def connect(self):
@ -62,24 +63,33 @@ class TCPConnection(object):
data = u''
while True:
try:
newdata = self._io.recv(1024)
except socket.timeout:
newdata = u''
dlist = [self._io.fileno()]
rlist, wlist, xlist = select(dlist, dlist, dlist, 1)
if dlist[0] in rlist + wlist:
newdata = self._io.recv(1024)
if dlist[0] in xlist:
print "Problem: exception on socket, reconnecting!"
for cb, arg in self.callbacks:
cb(arg)
return
except socket.timeout:
pass
except Exception as err:
print err, "reconnecting"
self.connect()
data = u''
continue
for cb, arg in self.callbacks:
cb(arg)
return
data += newdata
while '\n' in data:
line, data = data.split('\n', 1)
try:
self._readbuffer.put(
line.strip('\r'), block=True, timeout=1)
self._readbuffer.put(line.strip('\r'),
block=True,
timeout=1)
except Queue.Full:
self.log.debug(
'rcv queue full! dropping line: %r' % line)
self.log.debug('rcv queue full! dropping line: %r' %
line)
finally:
self._thread = None
@ -147,8 +157,9 @@ class Client(object):
devport = opts.pop('device')
baudrate = int(opts.pop('baudrate', 115200))
self.contactPoint = "serial://%s:%s" % (devport, baudrate)
self.connection = serial.Serial(devport, baudrate=baudrate,
timeout=1)
self.connection = serial.Serial(
devport, baudrate=baudrate, timeout=1)
self.connection.callbacks = []
else:
host = opts.pop('connectto', 'localhost')
port = int(opts.pop('port', 10767))
@ -198,7 +209,7 @@ class Client(object):
self.secop_id = line
continue
msgtype, spec, data = self.decode_message(line)
if msgtype in ('update', 'changed'):
if msgtype in ('event', 'update', 'changed'):
# handle async stuff
self._handle_event(spec, data)
# handle sync stuff
@ -217,21 +228,20 @@ class Client(object):
if entry:
self.log.error("request %r resulted in Error %r" %
(data[0], spec))
entry.extend([True, EXCEPTIONS[spec](data)])
entry.extend([True, EXCEPTIONS[spec](*data)])
entry[0].set()
return
self.log.error("got an unexpected error %s %r" %
(spec, data[0]))
self.log.error("got an unexpected error %s %r" % (spec, data[0]))
return
if msgtype == "describing":
data = [spec, data]
spec = ''
entry = self.expected_replies.get((msgtype, spec), None)
entry = self.expected_replies.get((msgtype, ''), None)
else:
entry = self.expected_replies.get((msgtype, spec), None)
if entry:
self.log.debug("got expected reply '%s %s'" %
(msgtype, spec) if spec else
"got expected reply '%s'" % msgtype)
entry.extend([False, data])
self.log.debug("got expected reply '%s %s'" % (msgtype, spec)
if spec else "got expected reply '%s'" % msgtype)
entry.extend([False, msgtype, spec, data])
entry[0].set()
return
@ -282,8 +292,8 @@ class Client(object):
try:
mkthread(func, data)
except Exception as err:
self.log.exception(
'Exception in Single-shot Callback!', err)
self.log.exception('Exception in Single-shot Callback!',
err)
run.add(func)
self.single_shots[spec].difference_update(run)
@ -294,7 +304,8 @@ class Client(object):
return self._getDescribingModuleData(module)['parameters'][parameter]
def _issueDescribe(self):
self.equipment_id, self.describing_data = self.communicate('describe')
_, self.equipment_id, self.describing_data = self._communicate(
'describe')
for module, moduleData in self.describing_data['modules'].items():
for parameter, parameterData in moduleData['parameters'].items():
@ -303,17 +314,18 @@ class Client(object):
[parameter]['validator'] = validator
def register_callback(self, module, parameter, cb):
self.log.debug(
'registering callback %r for %s:%s' %
(cb, module, parameter))
self.log.debug('registering callback %r for %s:%s' %
(cb, module, parameter))
self.callbacks.setdefault('%s:%s' % (module, parameter), set()).add(cb)
def unregister_callback(self, module, parameter, cb):
self.log.debug(
'unregistering callback %r for %s:%s' %
(cb, module, parameter))
self.callbacks.setdefault('%s:%s' %
(module, parameter), set()).discard(cb)
self.log.debug('unregistering callback %r for %s:%s' %
(cb, module, parameter))
self.callbacks.setdefault('%s:%s' % (module, parameter),
set()).discard(cb)
def register_shutdown_callback(self, func, arg):
self.connection.callbacks.append((func, arg))
def _get_reply_from_request(self, requesttype):
# maps each (sync) request to the corresponding reply
@ -331,18 +343,24 @@ class Client(object):
return REPLYMAP.get(requesttype, requesttype)
def communicate(self, msgtype, spec='', data=None):
# only return the data portion....
return self._communicate(msgtype, spec, data)[2]
def _communicate(self, msgtype, spec='', data=None):
self.log.debug('communicate: %r %r %r' % (msgtype, spec, data))
if self.stopflag:
raise RuntimeError('alreading stopping!')
if msgtype == "*IDN?":
return self.secop_id
if msgtype not in ('*IDN?', 'describe', 'activate',
'deactivate', 'do', 'change', 'read', 'ping', 'help'):
raise EXCEPTIONS['Protocol'](errorclass='Protocol',
errorinfo='%r: No Such Messagetype defined!' %
msgtype,
origin=self.encode_message(msgtype, spec, data))
if msgtype not in ('*IDN?', 'describe', 'activate', 'deactivate', 'do',
'change', 'read', 'ping', 'help'):
raise EXCEPTIONS['Protocol'](args=[
self.encode_message(msgtype, spec, data),
dict(
errorclass='Protocol',
errorinfo='%r: No Such Messagetype defined!' % msgtype, ),
])
# sanitize input + handle syntactic sugar
msgtype = str(msgtype)
@ -356,7 +374,8 @@ class Client(object):
rply = self._get_reply_from_request(msgtype)
if (rply, spec) in self.expected_replies:
raise RuntimeError(
"can not have more than one requests of the same type at the same time!")
"can not have more than one requests of the same type at the same time!"
)
# prepare sending request
event = threading.Event()
@ -371,11 +390,14 @@ class Client(object):
# wait for reply. timeout after 10s
if event.wait(10):
self.log.debug('checking reply')
event, is_error, result = self.expected_replies.pop((rply, spec))
entry = self.expected_replies.pop((rply, spec))
# entry is: event, is_error, exc_or_msgtype [,spec, date]<- if !err
is_error = entry[1]
if is_error:
# if error, result contains the rigth Exception to raise
raise result
return result
# if error, entry[2] contains the rigth Exception to raise
raise entry[2]
# valid reply: entry[2:5] contain msgtype, spec, data
return tuple(entry[2:5])
# timed out
del self.expected_replies[(rply, spec)]
@ -447,17 +469,21 @@ class Client(object):
return self.describing_data['modules'][module]['parameters'].keys()
def getModuleBaseClass(self, module):
return self.describing_data['modules'][module]['baseclass']
return self.describing_data['modules'][module]['interfaceclass']
def getCommands(self, module):
return self.describing_data['modules'][module]['commands'].keys()
def getProperties(self, module, parameter):
return self.describing_data['modules'][
module]['parameters'][parameter]
return self.describing_data['modules'][module]['parameters'][parameter]
def syncCommunicate(self, *msg):
return self.communicate(*msg)
res = self._communicate(*msg)
try:
res = self.encode_message(*res)
except Exception:
res = str(res)
return res
def ping(self, pingctr=[0]):
pingctr[0] = pingctr[0] + 1