various improvements on the sea client
This commit is contained in:
parent
d3fc01689f
commit
a5cc98d0b0
247
secop_psi/sea.py
247
secop_psi/sea.py
@ -41,28 +41,25 @@ from os.path import expanduser, join, exists
|
||||
from secop.client import ProxyClient
|
||||
from secop.datatypes import ArrayOf, BoolType, \
|
||||
EnumType, FloatRange, IntRange, StringType
|
||||
from secop.errors import ConfigError, HardwareError, secop_error
|
||||
from secop.errors import ConfigError, HardwareError, secop_error, NoSuchModuleError
|
||||
from secop.lib import getGeneralConfig, mkthread
|
||||
from secop.lib.asynconn import AsynConn, ConnectionClosed
|
||||
from secop.modules import Attached, Command, Done, Drivable, \
|
||||
Module, Parameter, Readable, Writable
|
||||
Module, Parameter, Property, Readable, Writable
|
||||
from secop.protocol.dispatcher import make_update
|
||||
|
||||
CFG_HEADER = """[NODE]
|
||||
id = %(samenv)s.psi.ch
|
||||
description = %(samenv)s over SEA
|
||||
|
||||
[seaconn]
|
||||
CFG_HEADER = """[seaconn]
|
||||
class = secop_psi.sea.SeaClient
|
||||
description = a SEA connection
|
||||
description = SEA connection to %(samenv)s
|
||||
config = %(config)s
|
||||
%(export)s
|
||||
"""
|
||||
|
||||
CFG_MODULE = """
|
||||
[%(module)s]
|
||||
class = secop_psi.sea.%(modcls)s
|
||||
iodev = seaconn
|
||||
json_descr = %(descr)s
|
||||
remote_paths = .
|
||||
sea_object = %(module)s
|
||||
"""
|
||||
|
||||
|
||||
@ -95,6 +92,13 @@ class SeaClient(ProxyClient, Module):
|
||||
|
||||
uri = Parameter('hostname:portnumber', datatype=StringType(), default='localhost:5000')
|
||||
timeout = Parameter('timeout', datatype=FloatRange(0), default=10)
|
||||
config = Property("""needed SEA configuration, space separated
|
||||
|
||||
Example: "ori4.config ori4.stick"
|
||||
""", StringType(), default='')
|
||||
service = Property("main/stick/addons", StringType(), default='')
|
||||
visibility = 'expert'
|
||||
default_json_file = {}
|
||||
|
||||
def __init__(self, name, log, opts, srv):
|
||||
instance = srv.node_cfg['name'].rsplit('_', 1)[0]
|
||||
@ -107,6 +111,9 @@ class SeaClient(ProxyClient, Module):
|
||||
self.shutdown = False
|
||||
self.path2param = {}
|
||||
self._write_lock = threading.Lock()
|
||||
config = opts.get('config')
|
||||
if config:
|
||||
self.default_json_file[name] = config.split()[0] + '.json'
|
||||
self.io = None
|
||||
ProxyClient.__init__(self)
|
||||
Module.__init__(self, name, log, opts, srv)
|
||||
@ -126,8 +133,10 @@ class SeaClient(ProxyClient, Module):
|
||||
assert self.asyncio.readline() == b'OK'
|
||||
self.asyncio.writeline(b'Spy 007')
|
||||
assert self.asyncio.readline() == b'Login OK'
|
||||
# the json protocol is better for updates
|
||||
self.asyncio.writeline(b'protocol set json')
|
||||
self.request('frappy_config %s %s' % (self.service, self.config))
|
||||
|
||||
# frappy_async_client switches to the json protocol (better for updates)
|
||||
self.asyncio.writeline(b'frappy_async_client')
|
||||
self.asyncio.writeline(('get_all_param ' + ' '.join(self.objects)).encode())
|
||||
mkthread(self._rxthread, started_callback)
|
||||
|
||||
@ -141,7 +150,9 @@ class SeaClient(ProxyClient, Module):
|
||||
assert self.io.readline() == b'OK'
|
||||
self.io.writeline(b'seauser seaser')
|
||||
assert self.io.readline() == b'Login OK'
|
||||
print('connected to %s' % self.uri)
|
||||
self.io.flush_recv()
|
||||
# print('> %s' % command)
|
||||
self.io.writeline(('fulltransact %s' % command).encode())
|
||||
result = None
|
||||
deadline = time.time() + 10
|
||||
@ -153,6 +164,7 @@ class SeaClient(ProxyClient, Module):
|
||||
except ConnectionClosed:
|
||||
break
|
||||
reply = reply.decode()
|
||||
# print('< %s' % reply)
|
||||
if reply.startswith('TRANSACTIONSTART'):
|
||||
result = []
|
||||
continue
|
||||
@ -207,7 +219,7 @@ class SeaClient(ProxyClient, Module):
|
||||
started_callback = None
|
||||
continue
|
||||
if flag != 'hdbevent':
|
||||
if obj != 'protocol':
|
||||
if obj != 'frappy_async_client':
|
||||
print('SKIP', msg)
|
||||
continue
|
||||
if data is None:
|
||||
@ -223,6 +235,15 @@ class SeaClient(ProxyClient, Module):
|
||||
try:
|
||||
module, param = self.path2param[path]
|
||||
except KeyError:
|
||||
if path.startswith('/device'):
|
||||
if path == '/device/changetime':
|
||||
result = self.request('check_config %s %s' % (self.service, self.config))
|
||||
if result == '1':
|
||||
self.asyncio.writeline(('get_all_param ' + ' '.join(self.objects)).encode())
|
||||
else:
|
||||
self.DISPATCHER.shutdown()
|
||||
elif path.startswith('/device/frappy_%s' % self.service) and value == '':
|
||||
self.DISPATCHER.shutdown()
|
||||
# print('UNUSED', msg)
|
||||
continue # unused parameter
|
||||
oldv, oldt, oldr = self.cache.get((module, param), [None, None, None])
|
||||
@ -232,7 +253,7 @@ class SeaClient(ProxyClient, Module):
|
||||
# do not update unchanged values within 0.1 sec
|
||||
self.updateValue(module, param, value, now, readerror)
|
||||
|
||||
@Command
|
||||
@Command(StringType(), result=StringType())
|
||||
def communicate(self, command):
|
||||
"""send a command to SEA"""
|
||||
reply = self.request(command)
|
||||
@ -245,26 +266,54 @@ class SeaClient(ProxyClient, Module):
|
||||
reply = ''.join('' if line.startswith('WARNING') else line for line in reply.split('\n'))
|
||||
samenv, reply = json.loads(reply)
|
||||
samenv = samenv.replace('/', '_')
|
||||
result = []
|
||||
with open(join(seaconfdir, samenv + '.cfg'), 'w') as cfp:
|
||||
cfp.write(CFG_HEADER % dict(samenv=samenv))
|
||||
modules = {}
|
||||
modcls = {}
|
||||
for filename, obj, descr in reply:
|
||||
content = json.dumps([obj, descr]).replace('}, {', '},\n{')
|
||||
if filename not in modules:
|
||||
modules[filename] = {}
|
||||
if descr['params'][0].get('cmd', '').startswith('run '):
|
||||
modcls[obj] = 'SeaDrivable'
|
||||
elif not descr['params'][0].get('readonly', True):
|
||||
modcls[obj] = 'SeaWritable'
|
||||
else:
|
||||
modcls[obj] = 'SeaReadable'
|
||||
modules.setdefault(filename, {})[obj] = descr
|
||||
|
||||
result = []
|
||||
for filename, descr in modules.items():
|
||||
stripped = filename.rpartition('.')[0]
|
||||
with open(join(seaconfdir, stripped + '.cfg'), 'w') as fp:
|
||||
fp.write(CFG_HEADER % dict(samenv=samenv, config=filename,
|
||||
export='' if filename.endswith('.config') else 'export=False'))
|
||||
for obj in descr:
|
||||
fp.write(CFG_MODULE % dict(modcls=modcls[obj], module=obj))
|
||||
content = json.dumps(descr).replace('}, {', '},\n{')
|
||||
with open(join(seaconfdir, filename + '.json'), 'w') as fp:
|
||||
fp.write(content + '\n')
|
||||
if descr[0].get('cmd', '').startswith('run '):
|
||||
modcls = 'SeaDrivable'
|
||||
result.append('%s: %s' % (filename, ','.join(n for n in descr)))
|
||||
return '; '.join(result)
|
||||
|
||||
@Command(StringType(), result=StringType())
|
||||
def query(self, cmd):
|
||||
"""a request checking for errors and accepting 0 or 1 line as result"""
|
||||
errors = []
|
||||
reply = None
|
||||
for line in self.request(cmd).split('\n'):
|
||||
if line.strip().startswith('ERROR:'):
|
||||
errors.append(line[6:].strip())
|
||||
elif reply is None:
|
||||
reply = line.strip()
|
||||
else:
|
||||
modcls = 'SeaReadable'
|
||||
cfp.write(CFG_MODULE % dict(modcls=modcls, module=obj, descr=filename))
|
||||
result.append(filename)
|
||||
return '\n'.join(result)
|
||||
raise HardwareError('SEA: superfluous reply %r to %r' % (reply, cmd))
|
||||
if errors:
|
||||
raise HardwareError('; '.join(errors))
|
||||
return reply
|
||||
|
||||
|
||||
SEA_TO_SECOPTYPE = {
|
||||
'float': FloatRange(),
|
||||
'text': StringType(),
|
||||
'int': IntRange(),
|
||||
'int': IntRange(-1 << 63, 1 << 63 - 1),
|
||||
'bool': BoolType(),
|
||||
'none': None,
|
||||
'floatvarar': ArrayOf(FloatRange(), 0, 400), # 400 is the current limit for proper notify events in SEA
|
||||
@ -288,21 +337,55 @@ class SeaModule(Module):
|
||||
# pollerClass=None
|
||||
path2param = None
|
||||
sea_object = None
|
||||
hdbpath = None # hdbpath for main writable
|
||||
|
||||
def __new__(cls, name, logger, cfgdict, dispatcher):
|
||||
def __new__(cls, name, logger, cfgdict, srv):
|
||||
if hasattr(srv, 'extra_sea_modules'):
|
||||
extra_modules = srv.extra_sea_modules
|
||||
else:
|
||||
extra_modules = {}
|
||||
srv.extra_sea_modules = extra_modules
|
||||
json_file = cfgdict.pop('json_file', None) or SeaClient.default_json_file[cfgdict['iodev']]
|
||||
visibility_level = cfgdict.pop('visibility_level', 2)
|
||||
json_descr = cfgdict.pop('json_descr')
|
||||
remote_paths = cfgdict.pop('remote_paths', '')
|
||||
|
||||
single_module = cfgdict.pop('single_module', None)
|
||||
if single_module:
|
||||
sea_object, base, paramdesc = extra_modules[single_module]
|
||||
params = [paramdesc]
|
||||
paramdesc['key'] = 'value'
|
||||
if issubclass(cls, SeaWritable):
|
||||
if paramdesc.get('readonly', True):
|
||||
raise ConfigError('%s is not writable' % sea_object)
|
||||
targetdesc = dict(paramdesc, key='target')
|
||||
params.append(targetdesc)
|
||||
paramdesc['readonly'] = True
|
||||
# print('SINGLE %s/%s %s %r' % (base, paramdesc['path'], cls.__name__, params))
|
||||
extra_module_set = ()
|
||||
if 'description' not in cfgdict:
|
||||
cfgdict['description'] = '%s (remote_paths=%s)' % (json_descr, remote_paths)
|
||||
with open(join(seaconfdir, json_descr + '.json')) as fp:
|
||||
sea_object, descr = json.load(fp)
|
||||
remote_paths = remote_paths.split()
|
||||
if remote_paths:
|
||||
cfgdict['description'] = '%s@%s' % (single_module, json_file)
|
||||
else:
|
||||
sea_object = cfgdict.pop('sea_object')
|
||||
rel_paths = cfgdict.pop('rel_paths', '.')
|
||||
if 'description' not in cfgdict:
|
||||
cfgdict['description'] = '%s@%s%s' % (
|
||||
name, json_file, '' if rel_paths == '.' else ' (rel_paths=%s)' % rel_paths)
|
||||
|
||||
# with open(join(seaconfdir, json_file + '.json')) as fp:
|
||||
# sea_object, descr = json.load(fp)
|
||||
with open(join(seaconfdir, json_file)) as fp:
|
||||
descr = json.load(fp)[sea_object]
|
||||
if rel_paths == '*' or not rel_paths:
|
||||
# take all
|
||||
main = descr['params'][0]
|
||||
# assert main['path'] == '' # TODO: check cases where this fails
|
||||
main['key'] = 'value'
|
||||
else:
|
||||
# filter by relative paths
|
||||
rel_paths = rel_paths.split()
|
||||
result = []
|
||||
for rpath in remote_paths:
|
||||
for rpath in rel_paths:
|
||||
include = True
|
||||
for paramdesc in descr:
|
||||
for paramdesc in descr['params']:
|
||||
path = paramdesc['path']
|
||||
if paramdesc.get('visibility', 1) > visibility_level:
|
||||
if not path.endswith('is_running'):
|
||||
@ -315,15 +398,25 @@ class SeaModule(Module):
|
||||
result.append(paramdesc)
|
||||
elif sub[0] == rpath:
|
||||
result.append(paramdesc)
|
||||
descr = result
|
||||
main = remote_paths[0]
|
||||
if main == '.':
|
||||
main = ''
|
||||
else: # take all
|
||||
main = ''
|
||||
descr['params'] = result
|
||||
if result[0]['path'] != '':
|
||||
pass # TODO: check these cases
|
||||
result[0]['key'] = 'value'
|
||||
logger.info('PARAMS %s %r', name, result)
|
||||
base = descr['base']
|
||||
params = descr['params']
|
||||
extra_module_set = cfgdict.pop('extra_modules', ())
|
||||
if extra_module_set:
|
||||
extra_module_set = set(extra_module_set.replace(',', ' ').split())
|
||||
path2param = {}
|
||||
attributes = dict(sea_object=sea_object, path2param=path2param)
|
||||
for paramdesc in descr:
|
||||
|
||||
# some guesses about visibility (may be overriden in *.cfg):
|
||||
if sea_object in ('table', 'cc'):
|
||||
attributes['visibility'] = 2
|
||||
elif base.count('/') > 1:
|
||||
attributes['visibility'] = 2
|
||||
for paramdesc in params:
|
||||
path = paramdesc['path']
|
||||
readonly = paramdesc.get('readonly', True)
|
||||
dt = get_datatype(paramdesc)
|
||||
@ -334,9 +427,8 @@ class SeaModule(Module):
|
||||
if kwds['datatype'] is None:
|
||||
kwds.update(visibility=3, default='', datatype=StringType())
|
||||
pathlist = path.split('/') if path else []
|
||||
if path == main:
|
||||
key = 'value'
|
||||
else:
|
||||
key = paramdesc.get('key') # will be None, 'value' or 'target'
|
||||
if key is None:
|
||||
if len(pathlist) > 0:
|
||||
if len(pathlist) == 1:
|
||||
kwds['group'] = 'more'
|
||||
@ -350,7 +442,6 @@ class SeaModule(Module):
|
||||
break
|
||||
if key == 'is_running':
|
||||
kwds['export'] = False
|
||||
path2param['/'.join(['', sea_object] + pathlist)] = (name, key)
|
||||
if key in cls.accessibles:
|
||||
if key == 'target':
|
||||
kwds['readonly'] = False
|
||||
@ -359,14 +450,20 @@ class SeaModule(Module):
|
||||
else:
|
||||
pobj = Parameter(**kwds)
|
||||
datatype = pobj.datatype
|
||||
hdbpath = '/'.join([base] + pathlist)
|
||||
if key in extra_module_set:
|
||||
extra_modules[name + '.' + key] = sea_object, base, paramdesc
|
||||
continue # skip this parameter
|
||||
path2param[hdbpath] = (name, key)
|
||||
logger.info('PARAM %s %s %s', hdbpath, name, key)
|
||||
attributes[key] = pobj
|
||||
if not hasattr(cls, 'read_' + key):
|
||||
# if hasattr(cls, 'read_' + key):
|
||||
# print('override %s.read_%s' % (cls.__name__, key))
|
||||
|
||||
def rfunc(self, cmd='hval /sics/%s/%s' % (sea_object, path)):
|
||||
print('READ', cmd)
|
||||
reply = self._iodev.request(cmd)
|
||||
reply = self._iodev.query(cmd)
|
||||
print('REPLY', reply)
|
||||
if reply.startswith('ERROR: '):
|
||||
raise HardwareError(reply.split(' ', 1)[1])
|
||||
try:
|
||||
reply = float(reply)
|
||||
except ValueError:
|
||||
@ -376,15 +473,18 @@ class SeaModule(Module):
|
||||
|
||||
attributes['read_' + key] = rfunc
|
||||
|
||||
if not (readonly or hasattr(cls, 'write_' + key)):
|
||||
# pylint wrongly complains 'Cell variable pobj defined in loop'
|
||||
# pylint: disable=cell-var-from-loop
|
||||
if not readonly:
|
||||
if hasattr(cls, 'write_' + key):
|
||||
print('override %s.write_%s' % (cls.__name__, key))
|
||||
|
||||
def wfunc(self, value, datatype=datatype, command=paramdesc['cmd']):
|
||||
# TODO: convert to valid tcl data
|
||||
cmd = "%s %s" % (command, datatype.export_value(value))
|
||||
print('WRITE', cmd)
|
||||
self._iodev.request(cmd)
|
||||
# an updateEvent will be handled before above returns
|
||||
value = datatype.export_value(value)
|
||||
if isinstance(value, bool):
|
||||
value = int(value)
|
||||
# TODO: check if more has to be done for valid tcl data (strings?)
|
||||
cmd = "%s %s" % (command, value)
|
||||
self._iodev.query(cmd)
|
||||
print('WRITE %s' % cmd)
|
||||
return Done
|
||||
|
||||
attributes['write_' + key] = wfunc
|
||||
@ -397,18 +497,20 @@ class SeaModule(Module):
|
||||
attributes[pname] = pobj.override(poll=False, needscfg=False)
|
||||
|
||||
classname = '%s_%s' % (cls.__name__, sea_object)
|
||||
# print(name, attributes)
|
||||
newcls = type(classname, (cls,), attributes)
|
||||
return Module.__new__(newcls)
|
||||
|
||||
# def __init__(self, name, logger, cfgdict, dispatcher):
|
||||
# Module.__init__(self, name, logger, cfgdict, dispatcher)
|
||||
|
||||
def updateEvent(self, module, parameter, value, timestamp, readerror):
|
||||
upd = getattr(self, 'update_' + parameter, None)
|
||||
if upd:
|
||||
upd(value, timestamp, readerror)
|
||||
return
|
||||
try:
|
||||
pobj = self.parameters[parameter]
|
||||
except KeyError:
|
||||
print(self.name, parameter)
|
||||
raise
|
||||
pobj.timestamp = timestamp
|
||||
#if not pobj.readonly and pobj.value != value:
|
||||
# print('UPDATE', module, parameter, value)
|
||||
@ -422,9 +524,6 @@ class SeaModule(Module):
|
||||
pobj.readerror = readerror
|
||||
self.DISPATCHER.broadcast_event(make_update(self.name, pobj))
|
||||
|
||||
#def earlyInit(self):
|
||||
# self.path2param = {k % subst: v for k, v in self.path2param.items()}
|
||||
|
||||
def initModule(self):
|
||||
self._iodev.register_obj(self, self.sea_object)
|
||||
super().initModule()
|
||||
@ -445,25 +544,26 @@ class SeaReadable(SeaModule, Readable):
|
||||
|
||||
|
||||
class SeaWritable(SeaModule, Writable):
|
||||
pass
|
||||
def read_value(self):
|
||||
return self.target
|
||||
|
||||
def update_target(self, value, timestamp, readerror):
|
||||
if not readerror:
|
||||
self.value = value
|
||||
|
||||
|
||||
class SeaDrivable(SeaModule, Drivable):
|
||||
_sea_status = ''
|
||||
_is_running = 0
|
||||
|
||||
#def buildParams(self, cfgdict, name):
|
||||
# # insert here special treatment for status and target
|
||||
# super().buildParams(cfgdict)
|
||||
|
||||
def read_status(self):
|
||||
return self.status
|
||||
|
||||
def read_target(self):
|
||||
return self.target
|
||||
# def read_target(self):
|
||||
# return self.target
|
||||
|
||||
def write_target(self, value):
|
||||
self._iodev.request('run %s %s' % (self.sea_object, value))
|
||||
self._iodev.query('run %s %s' % (self.sea_object, value))
|
||||
#self.status = [self.Status.BUSY, 'driving']
|
||||
return value
|
||||
|
||||
@ -488,3 +588,6 @@ class SeaDrivable(SeaModule, Drivable):
|
||||
def updateTarget(self, module, parameter, value, timestamp, readerror):
|
||||
if value is not None:
|
||||
self.target = value
|
||||
|
||||
def stop(self):
|
||||
self._iodev.query('%s is_running 0' % self.sea_object)
|
Loading…
x
Reference in New Issue
Block a user