From 5f9344109d777633c2021596e981bc46c8b54e4a Mon Sep 17 00:00:00 2001 From: l_samenv Date: Tue, 8 Sep 2020 13:36:11 +0200 Subject: [PATCH] improve sea client --- secop_psi/sea.py | 116 +++++++++++++++++++++++++++-------------------- 1 file changed, 68 insertions(+), 48 deletions(-) diff --git a/secop_psi/sea.py b/secop_psi/sea.py index af20724..76f718f 100644 --- a/secop_psi/sea.py +++ b/secop_psi/sea.py @@ -41,7 +41,7 @@ from secop.modules import Module, Parameter, Command, Override, Drivable, Readab from secop.datatypes import StringType, FloatRange, ArrayOf, BoolType, IntRange, EnumType from secop.lib import mkthread, getGeneralConfig from secop.lib.asynconn import AsynConn, ConnectionClosed -from secop.metaclass import Done +from secop.metaclass import ModuleMeta, Done from secop.errors import HardwareError, secop_error, ConfigError from secop.client import ProxyClient from secop.protocol.dispatcher import make_update @@ -54,7 +54,6 @@ description = %(samenv)s over SEA [seaconn] class = secop_psi.sea.SeaClient description = a SEA connection -uri = %(uri)s """ CFG_MODULE = """ @@ -67,6 +66,7 @@ remote_paths = . SEA_DIR = expanduser('~/sea') +confdir = getGeneralConfig()['confdir'].split(':', 1)[0] def get_sea_port(instance): @@ -143,14 +143,14 @@ class SeaClient(ProxyClient, Module): def request(self, command): """send a request and wait for reply""" with self._write_lock: - if not self.io: + if not self.io or not self.io.connection: + if not self.asyncio.connection: + self._connect(None) self.io = AsynConn(self.uri) assert self.io.readline() == b'OK' self.io.writeline(b'seauser seaser') assert self.io.readline() == b'Login OK' - # do this before changing to user rights (avoid cluttering log) - #self.io.writeline(b'protocol set json') - #self.io.writeline(b'config rights seauser seaser') + self.io.flush_recv() self.io.writeline(('fulltransact %s' % command).encode()) result = None deadline = time.time() + 10 @@ -170,7 +170,6 @@ class SeaClient(ProxyClient, Module): print('missing TRANSACTIONSTART on: %s' % command) return '' if not result: - print('missing return value on: %s' % command) return '' return '\n'.join(result) if result is None: @@ -203,7 +202,7 @@ class SeaClient(ProxyClient, Module): continue else: continue - data = {'error %s' % path: readerror.replace('ERROR: ', '')} + data = {'%s.geterror' % path: readerror.replace('ERROR: ', '')} obj = None flag = 'hdbevent' else: @@ -217,22 +216,30 @@ class SeaClient(ProxyClient, Module): started_callback = None continue if flag != 'hdbevent': - print('SKIP', msg) + if obj != 'protocol': + print('SKIP', msg) continue if data is None: continue + now = time.time() for path, value in data.items(): readerror = None - if path.startswith('error '): - readerror = HardwareError(value) + if path.endswith('.geterror'): + if value: + readerror = HardwareError(value) + path = path.rsplit('.', 1)[0] value = None - path = path[6:] try: module, param = self.path2param[path] - self.updateValue(module, param, value, time.time(), readerror) except KeyError: # print('UNUSED', msg) - pass # unused parameters + continue # unused parameter + oldv, oldt, oldr = self.cache.get((module, param), [None, None, None]) + if value is None: + value = oldv + if value != oldv or str(readerror) != str(oldr) or abs(now - oldt) > 60: + # do not update unchanged values within 0.1 sec + self.updateValue(module, param, value, now, readerror) def do_communicate(self, command): reply = self.request(command) @@ -243,10 +250,9 @@ class SeaClient(ProxyClient, Module): reply = ''.join('' if line.startswith('WARNING') else line for line in reply.split('\n')) samenv, reply = json.loads(reply) samenv = samenv.replace('/', '_') - confdir = getGeneralConfig()['confdir'] result = [] with open(join(confdir, 'sea', samenv + '.cfg'), 'w') as cfp: - cfp.write(CFG_HEADER % dict(samenv=samenv, uri=self.uri)) + cfp.write(CFG_HEADER % dict(samenv=samenv)) for filename, obj, descr in reply: content = json.dumps([obj, descr]).replace('}, {', '},\n{') with open(join(confdir, 'sea', filename + '.json'), 'w') as fp: @@ -290,28 +296,24 @@ class SeaModule(Module): path2param = None sea_object = None - def __init__(self, name, logger, cfgdict, dispatcher): - self.buildParams(cfgdict, name) - Module.__init__(self, name, logger, cfgdict, dispatcher) - - def buildParams(self, cfgdict, name): + def __new__(cls, name, logger, cfgdict, dispatcher): visibility_level = cfgdict.pop('visibility_level', 2) json_descr = cfgdict.pop('json_descr') remote_paths = cfgdict.pop('remote_paths', '') if 'description' not in cfgdict: cfgdict['description'] = '%s (remote_paths=%s)' % (json_descr, remote_paths) - with open(join(getGeneralConfig()['confdir'], 'sea', json_descr + '.json')) as fp: - obj, descr = json.load(fp) + with open(join(confdir, 'sea', json_descr + '.json')) as fp: + sea_object, descr = json.load(fp) remote_paths = remote_paths.split() - self.sea_object = obj if remote_paths: result = [] for rpath in remote_paths: include = True for paramdesc in descr: - if paramdesc.get('visibility', 1) > visibility_level: - continue path = paramdesc['path'] + if paramdesc.get('visibility', 1) > visibility_level: + if not path.endswith('is_running'): + continue sub = path.split('/', 1) if rpath == '.': # take all except subpaths with readonly node at top if len(sub) == 1: @@ -326,8 +328,9 @@ class SeaModule(Module): main = '' else: # take all main = '' - self.path2param = {} - accessibles = {} + path2param = {} + parameters = {} + attributes = dict(sea_object=sea_object, path2param=path2param, parameters=parameters) for paramdesc in descr: path = paramdesc['path'] readonly = paramdesc.get('readonly', True) @@ -350,43 +353,61 @@ class SeaModule(Module): # flatten path to parameter name for i in reversed(range(len(pathlist))): key = '_'.join(pathlist[i:]) - if not key in self.accessibles: + if not key in cls.accessibles: break if key == 'is_running': kwds['export'] = False - self.path2param['/'.join(['', obj] + pathlist)] = (name, key) - if key in self.accessibles: - pobj = Override(**kwds).apply(self.accessibles[key]) + path2param['/'.join(['', sea_object] + pathlist)] = (name, key) + if key in cls.accessibles: + if key == 'target': + kwds['readonly'] = False + pobj = Override(**kwds) + datatype = kwds.get('datatype', cls.accessibles[key].datatype) else: pobj = Parameter(**kwds) - accessibles[key] = pobj - - if not hasattr(self, 'read_' + key): - def rfunc(module=self, cmd='hval %s' % path): + datatype = pobj.datatype + parameters[key] = pobj + if not hasattr(cls, 'read_' + key): + def rfunc(self, cmd='hval /sics/%s/%s' % (sea_object, path)): print('READ', cmd) - module._iodev.request(cmd) + reply = self._iodev.request(cmd) + print('REPLY', reply) + if reply.startswith('ERROR: '): + raise HardwareError(reply.split(' ', 1)[1]) + try: + reply = float(reply) + except ValueError: + pass # an updateEvent will be handled before above returns - return Done + return reply - setattr(self, 'read_' + key, rfunc) + attributes['read_' + key] = rfunc - if not (readonly or hasattr(self, 'write_' + key)): + if not (readonly or hasattr(cls, 'write_' + key)): # pylint wrongly complains 'Cell variable pobj defined in loop' # pylint: disable=cell-var-from-loop - def wfunc(value, module=self, datatype=pobj.datatype, command=paramdesc['cmd']): + def wfunc(self, value, datatype=datatype, command=paramdesc['cmd']): # TODO: convert to valid tcl data cmd = "%s %s" % (command, datatype.export_value(value)) print('WRITE', cmd) - module._iodev.request(cmd) + self._iodev.request(cmd) # an updateEvent will be handled before above returns return Done - setattr(self, 'write_' + key, wfunc) + attributes['write_' + key] = wfunc # create standard parameters like value and status, if not yet there - for pname, pobj in self.accessibles.items(): - if pname not in accessibles and isinstance(pobj, Parameter) and pname != 'pollinterval': - accessibles[pname] = Override(poll=False, needscfg=False).apply(pobj) - self.accessibles = accessibles + for pname, pobj in cls.accessibles.items(): + if pname == 'pollinterval': + parameters[pname] = Override(export=False) + elif pname not in parameters and isinstance(pobj, Parameter): + parameters[pname] = Override(poll=False, needscfg=False) + + classname = '%s_%s' % (cls.__name__, sea_object) + newcls = ModuleMeta.__new__(ModuleMeta, classname, (cls,), attributes) + return Module.__new__(newcls) + + def __init__(self, name, logger, cfgdict, dispatcher): + Module.__init__(self, name, logger, cfgdict, dispatcher) def updateEvent(self, module, parameter, value, timestamp, readerror): upd = getattr(self, 'update_' + parameter, None) @@ -450,7 +471,6 @@ class SeaDrivable(SeaModule, Drivable): def write_target(self, value): self._iodev.request('run %s %s' % (self.sea_object, value)) #self.status = [self.Status.BUSY, 'driving'] - print('TARGET', self.status) return value def update_status(self, value, timestamp, readerror):