From a5cc98d0b0505625aa6619a08effd033e98e442d Mon Sep 17 00:00:00 2001 From: l_samenv Date: Thu, 8 Apr 2021 10:22:53 +0200 Subject: [PATCH] various improvements on the sea client --- secop_psi/sea.py | 303 +++++++++++++++++++++++++++++++---------------- 1 file changed, 203 insertions(+), 100 deletions(-) diff --git a/secop_psi/sea.py b/secop_psi/sea.py index b90a19f..a43a190 100644 --- a/secop_psi/sea.py +++ b/secop_psi/sea.py @@ -41,28 +41,25 @@ from os.path import expanduser, join, exists from secop.client import ProxyClient from secop.datatypes import ArrayOf, BoolType, \ EnumType, FloatRange, IntRange, StringType -from secop.errors import ConfigError, HardwareError, secop_error +from secop.errors import ConfigError, HardwareError, secop_error, NoSuchModuleError from secop.lib import getGeneralConfig, mkthread from secop.lib.asynconn import AsynConn, ConnectionClosed from secop.modules import Attached, Command, Done, Drivable, \ - Module, Parameter, Readable, Writable + Module, Parameter, Property, Readable, Writable from secop.protocol.dispatcher import make_update -CFG_HEADER = """[NODE] -id = %(samenv)s.psi.ch -description = %(samenv)s over SEA - -[seaconn] +CFG_HEADER = """[seaconn] class = secop_psi.sea.SeaClient -description = a SEA connection +description = SEA connection to %(samenv)s +config = %(config)s +%(export)s """ CFG_MODULE = """ [%(module)s] class = secop_psi.sea.%(modcls)s iodev = seaconn -json_descr = %(descr)s -remote_paths = . +sea_object = %(module)s """ @@ -95,6 +92,13 @@ class SeaClient(ProxyClient, Module): uri = Parameter('hostname:portnumber', datatype=StringType(), default='localhost:5000') timeout = Parameter('timeout', datatype=FloatRange(0), default=10) + config = Property("""needed SEA configuration, space separated + + Example: "ori4.config ori4.stick" + """, StringType(), default='') + service = Property("main/stick/addons", StringType(), default='') + visibility = 'expert' + default_json_file = {} def __init__(self, name, log, opts, srv): instance = srv.node_cfg['name'].rsplit('_', 1)[0] @@ -107,6 +111,9 @@ class SeaClient(ProxyClient, Module): self.shutdown = False self.path2param = {} self._write_lock = threading.Lock() + config = opts.get('config') + if config: + self.default_json_file[name] = config.split()[0] + '.json' self.io = None ProxyClient.__init__(self) Module.__init__(self, name, log, opts, srv) @@ -126,8 +133,10 @@ class SeaClient(ProxyClient, Module): assert self.asyncio.readline() == b'OK' self.asyncio.writeline(b'Spy 007') assert self.asyncio.readline() == b'Login OK' - # the json protocol is better for updates - self.asyncio.writeline(b'protocol set json') + self.request('frappy_config %s %s' % (self.service, self.config)) + + # frappy_async_client switches to the json protocol (better for updates) + self.asyncio.writeline(b'frappy_async_client') self.asyncio.writeline(('get_all_param ' + ' '.join(self.objects)).encode()) mkthread(self._rxthread, started_callback) @@ -141,7 +150,9 @@ class SeaClient(ProxyClient, Module): assert self.io.readline() == b'OK' self.io.writeline(b'seauser seaser') assert self.io.readline() == b'Login OK' + print('connected to %s' % self.uri) self.io.flush_recv() + # print('> %s' % command) self.io.writeline(('fulltransact %s' % command).encode()) result = None deadline = time.time() + 10 @@ -153,6 +164,7 @@ class SeaClient(ProxyClient, Module): except ConnectionClosed: break reply = reply.decode() + # print('< %s' % reply) if reply.startswith('TRANSACTIONSTART'): result = [] continue @@ -207,7 +219,7 @@ class SeaClient(ProxyClient, Module): started_callback = None continue if flag != 'hdbevent': - if obj != 'protocol': + if obj != 'frappy_async_client': print('SKIP', msg) continue if data is None: @@ -223,6 +235,15 @@ class SeaClient(ProxyClient, Module): try: module, param = self.path2param[path] except KeyError: + if path.startswith('/device'): + if path == '/device/changetime': + result = self.request('check_config %s %s' % (self.service, self.config)) + if result == '1': + self.asyncio.writeline(('get_all_param ' + ' '.join(self.objects)).encode()) + else: + self.DISPATCHER.shutdown() + elif path.startswith('/device/frappy_%s' % self.service) and value == '': + self.DISPATCHER.shutdown() # print('UNUSED', msg) continue # unused parameter oldv, oldt, oldr = self.cache.get((module, param), [None, None, None]) @@ -232,7 +253,7 @@ class SeaClient(ProxyClient, Module): # do not update unchanged values within 0.1 sec self.updateValue(module, param, value, now, readerror) - @Command + @Command(StringType(), result=StringType()) def communicate(self, command): """send a command to SEA""" reply = self.request(command) @@ -245,26 +266,54 @@ class SeaClient(ProxyClient, Module): reply = ''.join('' if line.startswith('WARNING') else line for line in reply.split('\n')) samenv, reply = json.loads(reply) samenv = samenv.replace('/', '_') + modules = {} + modcls = {} + for filename, obj, descr in reply: + if filename not in modules: + modules[filename] = {} + if descr['params'][0].get('cmd', '').startswith('run '): + modcls[obj] = 'SeaDrivable' + elif not descr['params'][0].get('readonly', True): + modcls[obj] = 'SeaWritable' + else: + modcls[obj] = 'SeaReadable' + modules.setdefault(filename, {})[obj] = descr + result = [] - with open(join(seaconfdir, samenv + '.cfg'), 'w') as cfp: - cfp.write(CFG_HEADER % dict(samenv=samenv)) - for filename, obj, descr in reply: - content = json.dumps([obj, descr]).replace('}, {', '},\n{') - with open(join(seaconfdir, filename + '.json'), 'w') as fp: - fp.write(content + '\n') - if descr[0].get('cmd', '').startswith('run '): - modcls = 'SeaDrivable' - else: - modcls = 'SeaReadable' - cfp.write(CFG_MODULE % dict(modcls=modcls, module=obj, descr=filename)) - result.append(filename) - return '\n'.join(result) + for filename, descr in modules.items(): + stripped = filename.rpartition('.')[0] + with open(join(seaconfdir, stripped + '.cfg'), 'w') as fp: + fp.write(CFG_HEADER % dict(samenv=samenv, config=filename, + export='' if filename.endswith('.config') else 'export=False')) + for obj in descr: + fp.write(CFG_MODULE % dict(modcls=modcls[obj], module=obj)) + content = json.dumps(descr).replace('}, {', '},\n{') + with open(join(seaconfdir, filename + '.json'), 'w') as fp: + fp.write(content + '\n') + result.append('%s: %s' % (filename, ','.join(n for n in descr))) + return '; '.join(result) + + @Command(StringType(), result=StringType()) + def query(self, cmd): + """a request checking for errors and accepting 0 or 1 line as result""" + errors = [] + reply = None + for line in self.request(cmd).split('\n'): + if line.strip().startswith('ERROR:'): + errors.append(line[6:].strip()) + elif reply is None: + reply = line.strip() + else: + raise HardwareError('SEA: superfluous reply %r to %r' % (reply, cmd)) + if errors: + raise HardwareError('; '.join(errors)) + return reply SEA_TO_SECOPTYPE = { 'float': FloatRange(), 'text': StringType(), - 'int': IntRange(), + 'int': IntRange(-1 << 63, 1 << 63 - 1), 'bool': BoolType(), 'none': None, 'floatvarar': ArrayOf(FloatRange(), 0, 400), # 400 is the current limit for proper notify events in SEA @@ -288,42 +337,86 @@ class SeaModule(Module): # pollerClass=None path2param = None sea_object = None + hdbpath = None # hdbpath for main writable - def __new__(cls, name, logger, cfgdict, dispatcher): + def __new__(cls, name, logger, cfgdict, srv): + if hasattr(srv, 'extra_sea_modules'): + extra_modules = srv.extra_sea_modules + else: + extra_modules = {} + srv.extra_sea_modules = extra_modules + json_file = cfgdict.pop('json_file', None) or SeaClient.default_json_file[cfgdict['iodev']] visibility_level = cfgdict.pop('visibility_level', 2) - json_descr = cfgdict.pop('json_descr') - remote_paths = cfgdict.pop('remote_paths', '') - if 'description' not in cfgdict: - cfgdict['description'] = '%s (remote_paths=%s)' % (json_descr, remote_paths) - with open(join(seaconfdir, json_descr + '.json')) as fp: - sea_object, descr = json.load(fp) - remote_paths = remote_paths.split() - if remote_paths: - result = [] - for rpath in remote_paths: - include = True - for paramdesc in descr: - path = paramdesc['path'] - if paramdesc.get('visibility', 1) > visibility_level: - if not path.endswith('is_running'): - continue - sub = path.split('/', 1) - if rpath == '.': # take all except subpaths with readonly node at top - if len(sub) == 1: - include = paramdesc.get('kids', 0) == 0 or not paramdesc.get('readonly', True) - if include or path == '': + + single_module = cfgdict.pop('single_module', None) + if single_module: + sea_object, base, paramdesc = extra_modules[single_module] + params = [paramdesc] + paramdesc['key'] = 'value' + if issubclass(cls, SeaWritable): + if paramdesc.get('readonly', True): + raise ConfigError('%s is not writable' % sea_object) + targetdesc = dict(paramdesc, key='target') + params.append(targetdesc) + paramdesc['readonly'] = True + # print('SINGLE %s/%s %s %r' % (base, paramdesc['path'], cls.__name__, params)) + extra_module_set = () + if 'description' not in cfgdict: + cfgdict['description'] = '%s@%s' % (single_module, json_file) + else: + sea_object = cfgdict.pop('sea_object') + rel_paths = cfgdict.pop('rel_paths', '.') + if 'description' not in cfgdict: + cfgdict['description'] = '%s@%s%s' % ( + name, json_file, '' if rel_paths == '.' else ' (rel_paths=%s)' % rel_paths) + + # with open(join(seaconfdir, json_file + '.json')) as fp: + # sea_object, descr = json.load(fp) + with open(join(seaconfdir, json_file)) as fp: + descr = json.load(fp)[sea_object] + if rel_paths == '*' or not rel_paths: + # take all + main = descr['params'][0] + # assert main['path'] == '' # TODO: check cases where this fails + main['key'] = 'value' + else: + # filter by relative paths + rel_paths = rel_paths.split() + result = [] + for rpath in rel_paths: + include = True + for paramdesc in descr['params']: + path = paramdesc['path'] + if paramdesc.get('visibility', 1) > visibility_level: + if not path.endswith('is_running'): + continue + sub = path.split('/', 1) + if rpath == '.': # take all except subpaths with readonly node at top + if len(sub) == 1: + include = paramdesc.get('kids', 0) == 0 or not paramdesc.get('readonly', True) + if include or path == '': + result.append(paramdesc) + elif sub[0] == rpath: result.append(paramdesc) - elif sub[0] == rpath: - result.append(paramdesc) - descr = result - main = remote_paths[0] - if main == '.': - main = '' - else: # take all - main = '' + descr['params'] = result + if result[0]['path'] != '': + pass # TODO: check these cases + result[0]['key'] = 'value' + logger.info('PARAMS %s %r', name, result) + base = descr['base'] + params = descr['params'] + extra_module_set = cfgdict.pop('extra_modules', ()) + if extra_module_set: + extra_module_set = set(extra_module_set.replace(',', ' ').split()) path2param = {} attributes = dict(sea_object=sea_object, path2param=path2param) - for paramdesc in descr: + + # some guesses about visibility (may be overriden in *.cfg): + if sea_object in ('table', 'cc'): + attributes['visibility'] = 2 + elif base.count('/') > 1: + attributes['visibility'] = 2 + for paramdesc in params: path = paramdesc['path'] readonly = paramdesc.get('readonly', True) dt = get_datatype(paramdesc) @@ -334,9 +427,8 @@ class SeaModule(Module): if kwds['datatype'] is None: kwds.update(visibility=3, default='', datatype=StringType()) pathlist = path.split('/') if path else [] - if path == main: - key = 'value' - else: + key = paramdesc.get('key') # will be None, 'value' or 'target' + if key is None: if len(pathlist) > 0: if len(pathlist) == 1: kwds['group'] = 'more' @@ -350,7 +442,6 @@ class SeaModule(Module): break if key == 'is_running': kwds['export'] = False - path2param['/'.join(['', sea_object] + pathlist)] = (name, key) if key in cls.accessibles: if key == 'target': kwds['readonly'] = False @@ -359,32 +450,41 @@ class SeaModule(Module): else: pobj = Parameter(**kwds) datatype = pobj.datatype + hdbpath = '/'.join([base] + pathlist) + if key in extra_module_set: + extra_modules[name + '.' + key] = sea_object, base, paramdesc + continue # skip this parameter + path2param[hdbpath] = (name, key) + logger.info('PARAM %s %s %s', hdbpath, name, key) attributes[key] = pobj - if not hasattr(cls, 'read_' + key): - def rfunc(self, cmd='hval /sics/%s/%s' % (sea_object, path)): - print('READ', cmd) - reply = self._iodev.request(cmd) - print('REPLY', reply) - if reply.startswith('ERROR: '): - raise HardwareError(reply.split(' ', 1)[1]) - try: - reply = float(reply) - except ValueError: - pass - # an updateEvent will be handled before above returns - return reply + # if hasattr(cls, 'read_' + key): + # print('override %s.read_%s' % (cls.__name__, key)) - attributes['read_' + key] = rfunc + def rfunc(self, cmd='hval /sics/%s/%s' % (sea_object, path)): + print('READ', cmd) + reply = self._iodev.query(cmd) + print('REPLY', reply) + try: + reply = float(reply) + except ValueError: + pass + # an updateEvent will be handled before above returns + return reply + + attributes['read_' + key] = rfunc + + if not readonly: + if hasattr(cls, 'write_' + key): + print('override %s.write_%s' % (cls.__name__, key)) - if not (readonly or hasattr(cls, 'write_' + key)): - # pylint wrongly complains 'Cell variable pobj defined in loop' - # pylint: disable=cell-var-from-loop def wfunc(self, value, datatype=datatype, command=paramdesc['cmd']): - # TODO: convert to valid tcl data - cmd = "%s %s" % (command, datatype.export_value(value)) - print('WRITE', cmd) - self._iodev.request(cmd) - # an updateEvent will be handled before above returns + value = datatype.export_value(value) + if isinstance(value, bool): + value = int(value) + # TODO: check if more has to be done for valid tcl data (strings?) + cmd = "%s %s" % (command, value) + self._iodev.query(cmd) + print('WRITE %s' % cmd) return Done attributes['write_' + key] = wfunc @@ -397,18 +497,20 @@ class SeaModule(Module): attributes[pname] = pobj.override(poll=False, needscfg=False) classname = '%s_%s' % (cls.__name__, sea_object) + # print(name, attributes) newcls = type(classname, (cls,), attributes) return Module.__new__(newcls) - # def __init__(self, name, logger, cfgdict, dispatcher): - # Module.__init__(self, name, logger, cfgdict, dispatcher) - def updateEvent(self, module, parameter, value, timestamp, readerror): upd = getattr(self, 'update_' + parameter, None) if upd: upd(value, timestamp, readerror) return - pobj = self.parameters[parameter] + try: + pobj = self.parameters[parameter] + except KeyError: + print(self.name, parameter) + raise pobj.timestamp = timestamp #if not pobj.readonly and pobj.value != value: # print('UPDATE', module, parameter, value) @@ -422,9 +524,6 @@ class SeaModule(Module): pobj.readerror = readerror self.DISPATCHER.broadcast_event(make_update(self.name, pobj)) - #def earlyInit(self): - # self.path2param = {k % subst: v for k, v in self.path2param.items()} - def initModule(self): self._iodev.register_obj(self, self.sea_object) super().initModule() @@ -445,25 +544,26 @@ class SeaReadable(SeaModule, Readable): class SeaWritable(SeaModule, Writable): - pass + def read_value(self): + return self.target + + def update_target(self, value, timestamp, readerror): + if not readerror: + self.value = value class SeaDrivable(SeaModule, Drivable): _sea_status = '' _is_running = 0 - #def buildParams(self, cfgdict, name): - # # insert here special treatment for status and target - # super().buildParams(cfgdict) - def read_status(self): return self.status - def read_target(self): - return self.target + # def read_target(self): + # return self.target def write_target(self, value): - self._iodev.request('run %s %s' % (self.sea_object, value)) + self._iodev.query('run %s %s' % (self.sea_object, value)) #self.status = [self.Status.BUSY, 'driving'] return value @@ -488,3 +588,6 @@ class SeaDrivable(SeaModule, Drivable): def updateTarget(self, module, parameter, value, timestamp, readerror): if value is not None: self.target = value + + def stop(self): + self._iodev.query('%s is_running 0' % self.sea_object) \ No newline at end of file