sea: fix parameter name mapping

- rel_path = ['tm', '.', 'set'] should mean:

'tm': tm parameters first, with /<obj?/tm as main value
'.': then all parameters directly at top level, except 'set'
'set': all parameters  below 'set'
driving happens at object level

- better name mangling (the 1st appearance of the same shortname
  is kept short)
This commit is contained in:
l_samenv
2024-11-28 18:06:14 +01:00
parent ede07e266c
commit ddc72d0ea7
27 changed files with 172 additions and 102 deletions

View File

@ -40,7 +40,8 @@ from frappy.client import ProxyClient
from frappy.datatypes import ArrayOf, BoolType, \
EnumType, FloatRange, IntRange, StringType, StatusType
from frappy.core import IDLE, BUSY, WARN, ERROR, DISABLED
from frappy.errors import ConfigError, HardwareError, ReadFailedError, CommunicationFailedError
from frappy.errors import ConfigError, HardwareError, ReadFailedError, \
CommunicationFailedError, ProgrammingError
from frappy.lib import generalConfig, mkthread, lazy_property
from frappy.lib.asynconn import AsynConn, ConnectionClosed
from frappy.modulebase import Done
@ -473,81 +474,105 @@ class SeaModule(Module):
json_file = cfgdict.pop('json_file', None) or SeaClient.default_json_file[cfgdict['io']]
visibility_level = cfgdict.pop('visibility_level', 2)
drive_cmd = None
single_module = cfgdict.pop('single_module', None)
if single_module:
sea_object, base, paramdesc = extra_modules[single_module]
params = [paramdesc]
paramdesc['key'] = 'value'
if issubclass(cls, SeaWritable):
if issubclass(cls, SeaWritable): # and not SeaDrivable!
if paramdesc.get('readonly', True):
raise ConfigError(f"{sea_object}/{paramdesc['path']} is not writable")
params.insert(0, paramdesc.copy()) # copy value
paramdesc['key'] = 'target'
paramdesc['readonly'] = False
extra_module_set = ()
if 'description' not in cfgdict:
if not cfgdict.get('description'):
cfgdict['description'] = f'{single_module}@{json_file}'
else:
sea_object = cfgdict.pop('sea_object')
rel_paths = cfgdict.pop('rel_paths', '.')
if 'description' not in cfgdict:
rel_paths = cfgdict.pop('rel_paths', None)
# rel_paths:
# a list of sub nodes to look for parameters
# '.' denotes the main path
# Readable: the main value is taken from the first subpath
# Writable/Drivable:
# - read the target value: <sicsobj> target
# - writing the target value: cmd from base path
if not cfgdict.get('description'):
cfgdict['description'] = '%s@%s%s' % (
name, json_file, '' if rel_paths == '.' else f' (rel_paths={rel_paths})')
name, json_file, '' if rel_paths is None else f' (rel_paths={rel_paths})')
with (seaconfig.dir / json_file).open(encoding='utf-8') as fp:
content = json.load(fp)
descr = content[sea_object]
if rel_paths == '*' or not rel_paths:
# take all
main = descr['params'][0]
if issubclass(cls, Readable):
# assert main['path'] == '' # TODO: check cases where this fails
main['key'] = 'value'
else:
descr['params'].pop(0)
else:
if True:
# filter by relative paths
result = []
is_running = None
for rpath in rel_paths:
include = True
for paramdesc in descr['params']:
path = paramdesc['path']
if path.endswith('is_running') and issubclass(cls, Drivable):
# take this independent of visibility
is_running = paramdesc
continue
if paramdesc.get('visibility', 1) > visibility_level:
continue
sub = path.split('/', 1)
if rpath == '.': # take all except subpaths with readonly node at top
if len(sub) == 1:
include = paramdesc.get('kids', 0) == 0 or not paramdesc.get('readonly', True)
if include or path == '':
result.append(paramdesc)
elif sub[0] == rpath:
result.append(paramdesc)
if is_running: # take this at end
result.append(is_running)
descr['params'] = result
rel0 = '' if rel_paths[0] == '.' else rel_paths[0]
if result[0]['path'] == rel0:
if issubclass(cls, Readable):
result[0]['key'] = 'value'
else:
result.pop(0)
if rel_paths:
result = {k: [] for k in rel_paths}
else:
logger.error('%s: no value found', name)
params = []
is_running = None
target_param = None
for paramdesc in descr['params']:
path = paramdesc['path']
pathlist = path.split('/')
if pathlist[-1] == 'is_running' and issubclass(cls, Drivable):
# take this independent of visibility
is_running = paramdesc
continue
if pathlist[-1] in ('target', 'targetValue') and issubclass(cls, Writable) and not target_param:
paramdesc['key'] = 'target'
paramdesc['readonly'] = False
target_param = paramdesc
if path == '':
drive_cmd = paramdesc.get('cmd')
elif paramdesc.get('visibility', 1) > visibility_level:
continue
if rel_paths:
sub = path.split('/', 1)
sublist = result.get(sub[0])
if sublist is None:
sublist = result.get('.')
# take all else except subpaths with readonly node at top
if sublist is not None and (
path == '' or len(sub) == 1 and (
paramdesc.get('kids', 0) == 0
or not paramdesc.get('readonly', True))):
sublist.append(paramdesc)
else:
sublist.append(paramdesc)
else:
params.append(paramdesc)
if rel_paths:
params = sum(result.values(), [])
if is_running: # take this at end
params.append(is_running)
descr['params'] = params
main_value = params[0]
if issubclass(cls, Readable):
if 'key' in main_value:
raise ProgrammingError(f'main_value {main_value!r}')
main_value['key'] = 'value'
else:
params.pop(0)
base = descr['base']
params = descr['params']
if issubclass(cls, SeaWritable):
if issubclass(cls, SeaWritable): # and not SeaDrivable!
paramdesc = params[0]
assert paramdesc['key'] == 'value'
if paramdesc.get('key') != 'value':
raise ProgrammingError(f"key of first parameter of {name} must be 'value'")
params.append(paramdesc.copy()) # copy value?
if paramdesc.get('readonly', True):
raise ConfigError(f"{sea_object}/{paramdesc['path']} is not writable")
paramdesc['key'] = 'target'
paramdesc['readonly'] = False
elif issubclass(cls, Drivable):
if target_param:
if not drive_cmd:
drive_cmd = f'run {name}'
logger.warning('missing cmd in %s, use "run %s"', base, name)
target_param['cmd'] = drive_cmd
extra_module_set = set(cfgdict.pop('extra_modules', ()))
path2param = {}
attributes = {'sea_object': sea_object, 'path2param': path2param}
@ -559,14 +584,14 @@ class SeaModule(Module):
attributes['visibility'] = 2
# check for ambiguous names. candidates are either the last item
# of the path or the full path (underscore separated)
simple_names = {k: 1 for k in cls.accessibles}
duplicates = {k: [k] for k in cls.accessibles}
for paramdesc in params:
path = paramdesc['path']
if path:
pathlist = path.split('/')
if 'key' not in paramdesc:
pname = pathlist[-1]
simple_names[pname] = simple_names.get(pname, 0) + 1
duplicates.setdefault(pname, pathlist)
for paramdesc in params:
path = paramdesc['path']
readonly = paramdesc.get('readonly', True)
@ -583,11 +608,11 @@ class SeaModule(Module):
if len(pathlist) > 0:
if len(pathlist) == 1:
if issubclass(cls, Readable):
kwds['group'] = 'more'
kwds['group'] = 'more_'
else:
kwds['group'] = pathlist[-2]
kwds['group'] = pathlist[-2] + '_'
# take short name if unique
if simple_names[pathlist[-1]] == 1:
if duplicates[pathlist[-1]] == pathlist:
key = pathlist[-1]
else:
key = '_'.join(pathlist)
@ -612,14 +637,15 @@ class SeaModule(Module):
datatype = pobj.datatype
if issubclass(cls, SeaWritable) and key == 'target':
kwds['readonly'] = False
attributes['value'] = Parameter(**kwds)
attributes['target'] = Parameter(**kwds)
hdbpath = '/'.join([base] + pathlist)
if key in extra_module_set:
extra_modules[name + '.' + key] = sea_object, base, paramdesc
continue # skip this parameter
path2param.setdefault(hdbpath, []).append((name, key))
attributes[key] = pobj
if key is not None:
path2param.setdefault(hdbpath, []).append((name, key))
attributes[key] = pobj
def rfunc(self, cmd=f'hval {base}/{path}'):
reply = self.io.query(cmd, True)
@ -631,7 +657,7 @@ class SeaModule(Module):
return reply
rfunc.poll = False
if key != 'status':
if key != 'status' and key is not None:
attributes['read_' + key] = rfunc
if not readonly:
@ -645,7 +671,7 @@ class SeaModule(Module):
self.io.query(cmd)
return Done
attributes['write_' + key] = wfunc
attributes['write_' + (key or 'target')] = wfunc
# create standard parameters like value and status, if not yet there
for pname, pobj in cls.accessibles.items():
@ -659,7 +685,11 @@ class SeaModule(Module):
pobj.__set_name__(cls, pname)
classname = f'{cls.__name__}_{name}'
newcls = type(classname, (cls,), attributes)
try:
newcls = type(classname, (cls,), attributes)
except Exception as e:
raise
# newcls = type(classname, (cls,), attributes)
result = Module.__new__(newcls)
return result