# ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** """generic SEA driver a object or subobject in sea may be assigned to a SECoP module Examples: SECoP SEA hipadaba path mod.obj mod.sub par.sub mod.path ------------------------------------------------------------------------------- tt:maxwait tt /tt/maxwait tt maxwait /tt tt:ramp tt set/ramp /tt/set/ramp tt set/ramp /tt t1:raw tt t1/raw /tt/t1/raw tt t1 raw /tt/t1 rx:bla rx bla /some/rx_a/bla rx bla /some/rx_a """ import json import threading import time import os from pathlib import Path from frappy.client import ProxyClient from frappy.datatypes import ArrayOf, BoolType, \ EnumType, FloatRange, IntRange, StringType, StatusType from frappy.core import IDLE, BUSY, WARN, ERROR, DISABLED from frappy.errors import ConfigError, HardwareError, ReadFailedError, \ CommunicationFailedError, ProgrammingError from frappy.lib import generalConfig, mkthread, lazy_property from frappy.lib.asynconn import AsynConn, ConnectionClosed from frappy.modulebase import Done from frappy.modules import Attached, Command, Drivable, \ Module, Parameter, Property, Readable, Writable CFG_HEADER = """Node('%(config)s.sea.psi.ch', '''%(nodedescr)s''', ) Mod(%(seaconn)r, 'frappy_psi.sea.SeaClient', '%(service)s sea connection for %(config)s', config = %(config)r, service = %(service)r, ) """ CFG_MODULE = """Mod(%(module)r, 'frappy_psi.sea.%(modcls)s', '', io = %(seaconn)r, sea_object = %(module)r, ) """ SERVICE_NAMES = { 'config': 'main', 'stick': 'stick', 'addon': 'addons', } SEA_DIR = Path('~/sea').expanduser() def get_sea_port(instance): for filename in ('sea_%s.tcl' % instance, 'sea.tcl'): try: with (SEA_DIR / filename).open(encoding='utf-8') as f: for line in f: linesplit = line.split() if len(linesplit) == 3: _, var, value = line.split() if var == 'serverport': return value except FileNotFoundError: pass return None class SeaConfig: @lazy_property def dir(self): seaconfdir = os.environ.get('FRAPPY_SEA_DIR') if seaconfdir is None or not Path(seaconfdir).expanduser().absolute().exists(): for confdir in generalConfig.confdir: seaconfdir = confdir / 'sea' if seaconfdir.exists(): break else: seaconfdir = Path(seaconfdir).expanduser().absolute() return seaconfdir seaconfig = SeaConfig() class SeaClient(ProxyClient, Module): """connection to SEA""" uri = Parameter('hostname:portnumber', datatype=StringType(), default='localhost:5000') timeout = Parameter('timeout for connecting and requests', datatype=FloatRange(0), default=10) config = Property("""needed SEA configuration, space separated Example: "ori4.config ori4.stick" """, StringType(), default='') service = Property("main/stick/addons", StringType(), default='') visibility = 'expert' default_json_file = {} _instance = None _last_connect = 0 def __init__(self, name, log, opts, srv): nodename = srv.node_cfg.get('name') or srv.node_cfg.get('equipment_id') instance = nodename.rsplit('_', 1)[0] if 'uri' not in opts: self._instance = instance port = get_sea_port(instance) if port is None: raise ConfigError('missing sea port for %s' % instance) opts['uri'] = {'value': 'tcp://localhost:%s' % port} self.objects = set() self.shutdown = False self.path2param = {} self._write_lock = threading.RLock() self._connect_thread = None self._connected = threading.Event() config = opts.get('config') if isinstance(config, dict): config = config['value'] if config: self.default_json_file[name] = config.split()[0] + '.json' self.syncio = None self.asynio = None ProxyClient.__init__(self) Module.__init__(self, name, log, opts, srv) def doPoll(self): if not self._connected.is_set() and time.time() > self._last_connect + self.timeout: if not self._last_connect: self.log.info('reconnect to SEA %s', self.service) if self._connect_thread is None: self._connect_thread = mkthread(self._connect) self._connected.wait(self.timeout) def register_obj(self, module, obj): self.objects.add(obj) for k, v in module.path2param.items(): self.path2param.setdefault(k, []).extend(v) self.register_callback(module.name, module.updateEvent) def _connect(self): try: if self.syncio: try: self.syncio.disconnect() except Exception: pass self._last_connect = time.time() if self._instance: try: from servicemanager import SeaManager # pylint: disable=import-outside-toplevel SeaManager().do_start(self._instance) except ImportError: pass if '//' not in self.uri: self.uri = 'tcp://' + self.uri self.asynio = AsynConn(self.uri) reply = self.asynio.readline() if reply != b'OK': raise CommunicationFailedError('reply %r should be "OK"' % reply) for _ in range(2): self.asynio.writeline(b'Spy 007') reply = self.asynio.readline() if reply == b'Login OK': break else: raise CommunicationFailedError('reply %r should be "Login OK"' % reply) self.syncio = AsynConn(self.uri) assert self.syncio.readline() == b'OK' self.syncio.writeline(b'seauser seaser') assert self.syncio.readline() == b'Login OK' if self.service and self.config: result = self.raw_request('frappy_config %s %s' % (self.service, self.config)) if result.startswith('ERROR:'): raise CommunicationFailedError(f'reply from frappy_config: {result}') # frappy_async_client switches to the json protocol (better for updates) self.asynio.writeline(b'frappy_async_client') self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode()) self.log.info('connected to %s', self.uri) self._connected.set() mkthread(self._rxthread) finally: self._connect_thread = None def request(self, command, quiet=False): with self._write_lock: if not self._connected.is_set(): if self._connect_thread is None: # let doPoll do the reconnect self.pollInfo.trigger(True) raise ConnectionClosed('disconnected - reconnect is tried later') return self.raw_request(command, quiet) def raw_request(self, command, quiet=False): """send a request and wait for reply""" try: self.syncio.flush_recv() ft = 'fulltransAct' if quiet else 'fulltransact' self.syncio.writeline(('%s %s' % (ft, command)).encode()) result = None deadline = time.time() + self.timeout while time.time() < deadline: reply = self.syncio.readline() if reply is None: continue reply = reply.decode() if reply.startswith('TRANSACTIONSTART'): result = [] continue if reply == 'TRANSACTIONFINISHED': if result is None: self.log.info('missing TRANSACTIONSTART on: %s', command) return '' if not result: return '' return '\n'.join(result) if result is None: self.log.info('swallow: %s', reply) continue if not result: result = [reply.split('=', 1)[-1]] else: result.append(reply) raise TimeoutError('no response within 10s') except ConnectionClosed: self.close_connections() raise def close_connections(self): connections = self.syncio, self.asynio self.syncio = self.asynio = None for conn in connections: try: conn.disconnect() except Exception: pass self._connected.clear() def _rxthread(self): recheck = None while not self.shutdown: if recheck and time.time() > recheck: # try to collect device changes within 1 sec recheck = None result = self.request('check_config %s %s' % (self.service, self.config)) if result == '1': self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode()) else: self.secNode.srv.shutdown() try: reply = self.asynio.readline() if reply is None: continue except ConnectionClosed: self.close_connections() break try: msg = json.loads(reply) except Exception as e: self.log.warn('bad reply %r %r', e, reply) continue if isinstance(msg, str): if msg.startswith('_E '): try: _, path, readerror = msg.split(None, 2) except ValueError: continue else: continue # path from sea may contain double slash // # this should be fixed, however in the meantime fix it here path = path.replace('//', '/') data = {'%s.geterror' % path: readerror.replace('ERROR: ', '')} obj = None flag = 'hdbevent' else: obj = msg['object'] flag = msg['flag'] data = msg['data'] if flag == 'finish' and obj == 'get_all_param': # first updates have finished continue if flag != 'hdbevent': if obj not in ('frappy_async_client', 'get_all_param'): self.log.debug('skip %r', msg) continue if not data: continue if not isinstance(data, dict): self.log.debug('what means %r', msg) continue now = time.time() for path, value in data.items(): readerror = None if path.endswith('.geterror'): if value: # TODO: add mechanism in SEA to indicate hardware errors readerror = ReadFailedError(value) path = path.rsplit('.', 1)[0] value = None mplist = self.path2param.get(path) if mplist is None: if path.startswith('/device'): if path == '/device/changetime': recheck = time.time() + 1 elif path.startswith('/device/frappy_%s' % self.service) and value == '': self.secNode.srv.shutdown() else: for module, param in mplist: oldv, oldt, oldr = self.cache.get((module, param), [None, None, None]) if value is None: value = oldv if value != oldv or str(readerror) != str(oldr) or abs(now - oldt) > 60: # do not update unchanged values within 60 sec self.updateValue(module, param, value, now, readerror) @Command(StringType(), result=StringType()) def communicate(self, command): """send a command to SEA""" reply = self.request(command) return reply @Command(StringType(), result=StringType()) def query(self, cmd, quiet=False): """a request checking for errors and accepting 0 or 1 line as result""" errors = [] reply = None for line in self.request(cmd, quiet).split('\n'): if line.strip().startswith('ERROR:'): errors.append(line[6:].strip()) elif reply is None: reply = line.strip() else: self.log.info('SEA: superfluous reply %r to %r', reply, cmd) if errors: raise HardwareError('; '.join(errors)) return reply class SeaConfigCreator(SeaClient): def startModule(self, start_events): """save objects (and sub-objects) description and exit""" self._connect() reply = self.request('describe_all') reply = ''.join('' if line.startswith('WARNING') else line for line in reply.split('\n')) description, reply = json.loads(reply) modules = {} modcls = {} for filename, obj, descr in reply: if filename not in modules: modules[filename] = {} if descr['params'][0].get('cmd', '').startswith('run '): modcls[obj] = 'SeaDrivable' elif not descr['params'][0].get('readonly', True): modcls[obj] = 'SeaWritable' else: modcls[obj] = 'SeaReadable' modules.setdefault(filename, {})[obj] = descr result = [] for filename, descr in modules.items(): stripped, _, ext = filename.rpartition('.') service = SERVICE_NAMES[ext] seaconn = 'sea_' + service cfgfile = seaconfig.dir / (stripped + '_cfg.py') with cfgfile.open('w', encoding='utf-8') as fp: fp.write(CFG_HEADER % {'config': filename, 'seaconn': seaconn, 'service': service, 'nodedescr': description.get(filename, filename)}) for obj in descr: fp.write(CFG_MODULE % {'modcls': modcls[obj], 'module': obj, 'seaconn': seaconn}) content = json.dumps(descr).replace('}, {', '},\n{').replace('[{', '[\n{').replace('}]}, ', '}]},\n\n') result.append('%s\n' % cfgfile) fpath = seaconfig.dir / (filename + '.json') fpath.write_text(content + '\n', encoding='utf-8') result.append('%s: %s' % (filename, ','.join(n for n in descr))) raise SystemExit('; '.join(result)) @Command(StringType(), result=StringType()) def query(self, cmd, quiet=False): """a request checking for errors and accepting 0 or 1 line as result""" errors = [] reply = None for line in self.request(cmd).split('\n'): if line.strip().startswith('ERROR:'): errors.append(line[6:].strip()) elif reply is None: reply = line.strip() else: self.log.info('SEA: superfluous reply %r to %r', reply, cmd) if errors: raise HardwareError('; '.join(errors)) return reply SEA_TO_SECOPTYPE = { 'float': FloatRange(), 'text': StringType(), 'int': IntRange(-1 << 63, 1 << 63 - 1), 'bool': BoolType(), 'none': None, 'floatvarar': ArrayOf(FloatRange(), 0, 400), # 400 is the current limit for proper notify events in SEA } class SeaEnum(EnumType): """some sea enum nodes have text type -> accept '' also""" def copy(self): return SeaEnum(self._enum) def __call__(self, value): try: value = int(value) return super().__call__(value) except Exception as e: raise ReadFailedError(e) from e def get_datatype(paramdesc): typ = paramdesc['type'] result = SEA_TO_SECOPTYPE.get(typ, False) if result is not False: # general case return result # special cases if typ == 'enum': return SeaEnum(paramdesc['enum']) raise ValueError('unknown SEA type %r' % typ) class SeaModule(Module): io = Attached() path2param = None sea_object = None hdbpath = None # hdbpath for main writable # pylint: disable=too-many-statements,arguments-differ,too-many-branches def __new__(cls, name, logger, cfgdict, srv): if hasattr(srv, 'extra_sea_modules'): extra_modules = srv.extra_sea_modules else: extra_modules = {} srv.extra_sea_modules = extra_modules for k, v in cfgdict.items(): try: cfgdict[k] = v['value'] except (KeyError, TypeError): pass json_file = cfgdict.pop('json_file', None) or SeaClient.default_json_file[cfgdict['io']] visibility_level = cfgdict.pop('visibility_level', 2) drive_cmd = None single_module = cfgdict.pop('single_module', None) if single_module: sea_object, base, paramdesc = extra_modules[single_module] params = [paramdesc] paramdesc['key'] = 'value' if issubclass(cls, SeaWritable): # and not SeaDrivable! if paramdesc.get('readonly', True): raise ConfigError(f"{sea_object}/{paramdesc['path']} is not writable") params.insert(0, paramdesc.copy()) # copy value paramdesc['key'] = 'target' paramdesc['readonly'] = False extra_module_set = () if not cfgdict.get('description'): cfgdict['description'] = f'{single_module}@{json_file}' else: sea_object = cfgdict.pop('sea_object') rel_paths = cfgdict.pop('rel_paths', None) # rel_paths: # a list of sub nodes to look for parameters # '.' denotes the main path # Readable: the main value is taken from the first subpath # Writable/Drivable: # - read the target value: target # - writing the target value: cmd from base path if not cfgdict.get('description'): cfgdict['description'] = '%s@%s%s' % ( name, json_file, '' if rel_paths is None else f' (rel_paths={rel_paths})') with (seaconfig.dir / json_file).open(encoding='utf-8') as fp: content = json.load(fp) descr = content[sea_object] if True: # filter by relative paths if rel_paths: result = {k: [] for k in rel_paths} else: params = [] is_running = None target_param = None for paramdesc in descr['params']: path = paramdesc['path'] pathlist = path.split('/') if pathlist[-1] == 'is_running' and issubclass(cls, Drivable): # take this independent of visibility is_running = paramdesc continue if pathlist[-1] in ('target', 'targetValue') and issubclass(cls, Writable) and not target_param: paramdesc['key'] = 'target' paramdesc['readonly'] = False target_param = paramdesc if path == '': drive_cmd = paramdesc.get('cmd') elif paramdesc.get('visibility', 1) > visibility_level: continue if rel_paths: sub = path.split('/', 1) sublist = result.get(sub[0]) if sublist is None: sublist = result.get('.') # take all else except subpaths with readonly node at top if sublist is not None and ( path == '' or len(sub) == 1 and ( paramdesc.get('kids', 0) == 0 or not paramdesc.get('readonly', True))): sublist.append(paramdesc) else: sublist.append(paramdesc) else: params.append(paramdesc) if rel_paths: params = sum(result.values(), []) if is_running: # take this at end params.append(is_running) descr['params'] = params main_value = params[0] if issubclass(cls, Readable): if 'key' in main_value: raise ProgrammingError(f'main_value {main_value!r}') main_value['key'] = 'value' else: params.pop(0) base = descr['base'] if issubclass(cls, SeaWritable): # and not SeaDrivable! paramdesc = params[0] if paramdesc.get('key') != 'value': raise ProgrammingError(f"key of first parameter of {name} must be 'value'") params.append(paramdesc.copy()) # copy value? if paramdesc.get('readonly', True): raise ConfigError(f"{sea_object}/{paramdesc['path']} is not writable") paramdesc['key'] = 'target' paramdesc['readonly'] = False elif issubclass(cls, Drivable): if target_param: if not drive_cmd: drive_cmd = f'run {name}' logger.warning('missing cmd in %s, use "run %s"', base, name) target_param['cmd'] = drive_cmd extra_module_set = set(cfgdict.pop('extra_modules', ())) path2param = {} attributes = {'sea_object': sea_object, 'path2param': path2param} # some guesses about visibility (may be overriden in *_cfg.py): if sea_object in ('table', 'cc'): attributes['visibility'] = 2 elif base.count('/') > 1: attributes['visibility'] = 2 # check for ambiguous names. candidates are either the last item # of the path or the full path (underscore separated) duplicates = {k: [k] for k in cls.accessibles} for paramdesc in params: path = paramdesc['path'] if path: pathlist = path.split('/') if 'key' not in paramdesc: pname = pathlist[-1] duplicates.setdefault(pname, pathlist) for paramdesc in params: path = paramdesc['path'] readonly = paramdesc.get('readonly', True) dt = get_datatype(paramdesc) kwds = {'description': paramdesc.get('description', path), 'datatype': dt, 'visibility': paramdesc.get('visibility', 1), 'needscfg': False, 'readonly': readonly} if kwds['datatype'] is None: kwds.update(visibility=3, default='', datatype=StringType()) pathlist = path.split('/') if path else [] key = paramdesc.get('key') # None, 'value' or 'target' if key is None: if len(pathlist) > 0: if len(pathlist) == 1: if issubclass(cls, Readable): kwds['group'] = 'more_' else: kwds['group'] = pathlist[-2] + '_' # take short name if unique if duplicates[pathlist[-1]] == pathlist: key = pathlist[-1] else: key = '_'.join(pathlist) if key == 'is_running': kwds['export'] = False if key == 'target' and kwds.get('group') == 'more': kwds.pop('group') if key in cls.accessibles: if key == 'target': kwds['readonly'] = False prev = cls.accessibles[key] if key == 'status': # special case: status from sea is a string, not the status tuple pobj = prev.copy() else: pobj = Parameter(**kwds) merged_properties = prev.propertyValues.copy() pobj.updateProperties(merged_properties) pobj.merge(merged_properties) else: pobj = Parameter(**kwds) datatype = pobj.datatype if issubclass(cls, SeaWritable) and key == 'target': kwds['readonly'] = False attributes['target'] = Parameter(**kwds) hdbpath = '/'.join([base] + pathlist) if key in extra_module_set: extra_modules[name + '.' + key] = sea_object, base, paramdesc continue # skip this parameter if key is not None: path2param.setdefault(hdbpath, []).append((name, key)) attributes[key] = pobj def rfunc(self, cmd=f'hval {base}/{path}'): reply = self.io.query(cmd, True) try: reply = float(reply) except ValueError: pass # an updateEvent will be handled before above returns return reply rfunc.poll = False if key != 'status' and key is not None: attributes['read_' + key] = rfunc if not readonly: def wfunc(self, value, datatype=datatype, command=paramdesc['cmd']): value = datatype.export_value(value) if isinstance(value, bool): value = int(value) # TODO: check if more has to be done for valid tcl data (strings?) cmd = f'{command} {value}' self.io.query(cmd) return Done attributes['write_' + (key or 'target')] = wfunc # create standard parameters like value and status, if not yet there for pname, pobj in cls.accessibles.items(): if pname == 'pollinterval': pobj.export = False attributes[pname] = pobj pobj.__set_name__(cls, pname) elif pname not in attributes and isinstance(pobj, Parameter): pobj.needscfg = False attributes[pname] = pobj pobj.__set_name__(cls, pname) classname = f'{cls.__name__}_{name}' try: newcls = type(classname, (cls,), attributes) except Exception as e: raise # newcls = type(classname, (cls,), attributes) result = Module.__new__(newcls) return result def updateEvent(self, module, parameter, value, timestamp, readerror): upd = getattr(self, 'update_' + parameter, None) if upd: upd(value, timestamp, readerror) return self.announceUpdate(parameter, value, readerror, timestamp) def initModule(self): self.io.register_obj(self, self.sea_object) super().initModule() class SeaReadable(SeaModule, Readable): _readerror = None _status = IDLE, '' status = Parameter(datatype=StatusType(Readable, 'DISABLED')) def update_value(self, value, timestamp, readerror): # make sure status is always ERROR when reading value fails self._readerror = readerror if readerror: self.read_status() # forced ERROR status self.announceUpdate('value', value, readerror, timestamp) else: # order is important self.value = value # includes announceUpdate self.read_status() # send event for ordinary self._status def update_status(self, value, timestamp, readerror): # value is the sea status, which is a string, not the SECoP status! if 'disable' in value.lower(): self._status = DISABLED, value elif value == '': self._status = IDLE, '' else: self._status = ERROR, value self.read_status() def read_status(self): if self._readerror: if 'disable' in str(self._readerror).lower(): return DISABLED, str(self._readerror) return ERROR, f'{self._readerror.name} - {self._readerror}' return self._status def doPoll(self): self.read_status() class SeaWritable(SeaReadable, Writable): def read_value(self): return self.target def update_target(self, value, timestamp, readerror): self.target = value if not readerror: self.value = value class SeaDrivable(SeaReadable, Drivable): _is_driving = False _is_running = 0 # SEA is_running flag _bad_limit = 5 _bad_start = 0 # is set to start time when is_running is not immediately true status = Parameter(datatype=StatusType(Drivable, 'DISABLED')) def write_target(self, value): self._is_driving = True if not self._is_running: self._is_running = None self._bad_start = 0 self.io.query(f'run {self.sea_object} {value}') if self._is_running: self.read_status() if not self.isDriving(): self.log.warn('status is not set after run') self._bad_start = time.time() self.status = BUSY, 'changed target' return value def update_is_running(self, value, timestamp, readerror): if readerror: self.log.warn('error in update_is_running %r', readerror) else: self._is_running = value try: self.read_status() except Exception as e: self.log.info('read_status failed %r', e) def read_status(self): status = super().read_status() if status[0] >= ERROR: if self._is_driving: return ERROR, 'BUSY + ' + status[1] return status if self._is_driving: if time.time() < self._bad_start + self._bad_limit: if self._is_running: self.log.warn('is_running flag delayed by %.2g sec', time.time() - self._bad_start) self._bad_start = 0 return BUSY, 'waiting for is_running' if self._is_running: self._bad_start = 0 return BUSY, 'driving' if self._is_running is None: self.log.warn('miss is_running update within delay') self._is_driving = False if self._bad_start: self.log.warn('started, but not running') if self._bad_start: return IDLE, f'started, but not running' return IDLE, '' def update_target(self, module, parameter, value, timestamp, readerror): # TODO: check if this is needed if value is not None: self.target = value @Command() def stop(self): """propagate to SEA - on stdsct drivables this will call the halt script - on EaseDriv this will set the stopped state """ self.io.query(f'{self.sea_object} is_running 0')