frappy_psi.sea: more improvements

- add sea_path property
- add LscDrivable (config of these modules is easier to understand)

Change-Id: I616dc94de6e784f6d8cfcf080d9a8408cbf73d93
This commit is contained in:
zolliker 2024-12-03 15:12:02 +01:00
parent ddc72d0ea7
commit 654a472a7e

View File

@ -36,7 +36,7 @@ import time
import os
from pathlib import Path
from frappy.client import ProxyClient
from frappy.client import ProxyClient, CacheItem
from frappy.datatypes import ArrayOf, BoolType, \
EnumType, FloatRange, IntRange, StringType, StatusType
from frappy.core import IDLE, BUSY, WARN, ERROR, DISABLED
@ -333,9 +333,10 @@ class SeaClient(ProxyClient, Module):
self.secNode.srv.shutdown()
else:
for module, param in mplist:
oldv, oldt, oldr = self.cache.get((module, param), [None, None, None])
oldv, oldt, oldr = self.cache[module, param]
if value is None:
value = oldv
self.cache[module, param] = CacheItem(value, now, readerror)
if value != oldv or str(readerror) != str(oldr) or abs(now - oldt) > 60:
# do not update unchanged values within 60 sec
self.updateValue(module, param, value, now, readerror)
@ -452,13 +453,19 @@ def get_datatype(paramdesc):
raise ValueError('unknown SEA type %r' % typ)
def get_cfg(cfgdict, *args):
result = cfgdict.get(*args)
return result['value'] if isinstance(result, dict) else result
def pop_cfg(cfgdict, *args):
result = cfgdict.pop(*args)
return result['value'] if isinstance(result, dict) else result
class SeaModule(Module):
io = Attached()
path2param = None
sea_object = None
hdbpath = None # hdbpath for main writable
# pylint: disable=too-many-statements,arguments-differ,too-many-branches
def __new__(cls, name, logger, cfgdict, srv):
if hasattr(srv, 'extra_sea_modules'):
@ -466,16 +473,11 @@ class SeaModule(Module):
else:
extra_modules = {}
srv.extra_sea_modules = extra_modules
for k, v in cfgdict.items():
try:
cfgdict[k] = v['value']
except (KeyError, TypeError):
pass
json_file = cfgdict.pop('json_file', None) or SeaClient.default_json_file[cfgdict['io']]
visibility_level = cfgdict.pop('visibility_level', 2)
drive_cmd = None
single_module = cfgdict.pop('single_module', None)
json_file = pop_cfg(cfgdict, 'json_file', None) or SeaClient.default_json_file[get_cfg(cfgdict, 'io')]
visibility_level = pop_cfg(cfgdict, 'visibility_level', 2)
single_module = pop_cfg(cfgdict, 'single_module', None)
if single_module:
sea_object, base, paramdesc = extra_modules[single_module]
params = [paramdesc]
@ -487,68 +489,65 @@ class SeaModule(Module):
paramdesc['key'] = 'target'
paramdesc['readonly'] = False
extra_module_set = ()
if not cfgdict.get('description'):
if not get_cfg(cfgdict, 'description'):
cfgdict['description'] = f'{single_module}@{json_file}'
else:
sea_object = cfgdict.pop('sea_object')
rel_paths = cfgdict.pop('rel_paths', None)
sea_object = pop_cfg(cfgdict, 'sea_object', None)
sea_path = pop_cfg(cfgdict, 'sea_path', None)
if sea_object:
if sea_path:
raise ConfigError(f'module {name}: superfluous sea_object property (sea_path is given)')
sea_path = sea_object
rel_paths = get_cfg(cfgdict, 'rel_paths', None)
if rel_paths is None:
sea_object, *rel_paths = sea_path.split('/', 1)
if not rel_paths:
rel_paths = None
else:
if '/' in sea_path:
raise ConfigError(f'module {name}: superfluous rel_paths property (sea_path is given)')
sea_object = sea_path
# rel_paths:
# a list of sub nodes to look for parameters
# '.' denotes the main path
# Readable: the main value is taken from the first subpath
# Writable/Drivable:
# Writable:
# - read the target value: <sicsobj> target
# - writing the target value: cmd from base path
if not cfgdict.get('description'):
# - write target value: command from first subpath
# Drivable:
# - write target value: run <sea_object> <target>
if not get_cfg(cfgdict, 'description'):
cfgdict['description'] = '%s@%s%s' % (
name, json_file, '' if rel_paths is None else f' (rel_paths={rel_paths})')
with (seaconfig.dir / json_file).open(encoding='utf-8') as fp:
content = json.load(fp)
descr = content[sea_object]
if True:
# filter by relative paths
if rel_paths:
result = {k: [] for k in rel_paths}
# filter by relative paths
if rel_paths:
result = {k: [] for k in rel_paths}
else:
result = {True: []}
is_running = None
for paramdesc in descr['params']:
path = paramdesc['path']
pathlist = path.split('/')
if pathlist[-1] == 'is_running' and issubclass(cls, Drivable):
# take this independent of visibility
is_running = paramdesc
continue
if paramdesc.get('visibility', 1) > visibility_level:
continue
if rel_paths is None:
result[True].append(paramdesc)
else:
params = []
is_running = None
target_param = None
for paramdesc in descr['params']:
path = paramdesc['path']
pathlist = path.split('/')
if pathlist[-1] == 'is_running' and issubclass(cls, Drivable):
# take this independent of visibility
is_running = paramdesc
continue
if pathlist[-1] in ('target', 'targetValue') and issubclass(cls, Writable) and not target_param:
paramdesc['key'] = 'target'
paramdesc['readonly'] = False
target_param = paramdesc
if path == '':
drive_cmd = paramdesc.get('cmd')
elif paramdesc.get('visibility', 1) > visibility_level:
continue
if rel_paths:
sub = path.split('/', 1)
sublist = result.get(sub[0])
if sublist is None:
sublist = result.get('.')
# take all else except subpaths with readonly node at top
if sublist is not None and (
path == '' or len(sub) == 1 and (
paramdesc.get('kids', 0) == 0
or not paramdesc.get('readonly', True))):
sublist.append(paramdesc)
else:
sublist.append(paramdesc)
else:
params.append(paramdesc)
if rel_paths:
params = sum(result.values(), [])
if is_running: # take this at end
params.append(is_running)
descr['params'] = params
cls.paramFilter(result, paramdesc)
cfgdict.pop('rel_paths', None)
params = sum(result.values(), [])
if is_running: # take this at end
params.append(is_running)
main_value = params[0]
if issubclass(cls, Readable):
if 'key' in main_value:
@ -567,13 +566,7 @@ class SeaModule(Module):
paramdesc['key'] = 'target'
paramdesc['readonly'] = False
elif issubclass(cls, Drivable):
if target_param:
if not drive_cmd:
drive_cmd = f'run {name}'
logger.warning('missing cmd in %s, use "run %s"', base, name)
target_param['cmd'] = drive_cmd
extra_module_set = set(cfgdict.pop('extra_modules', ()))
extra_module_set = set(pop_cfg(cfgdict, 'extra_modules', ()))
path2param = {}
attributes = {'sea_object': sea_object, 'path2param': path2param}
@ -620,24 +613,33 @@ class SeaModule(Module):
kwds['export'] = False
if key == 'target' and kwds.get('group') == 'more':
kwds.pop('group')
prev = cls.accessibles.get(key)
if key in cls.accessibles:
if key == 'target':
kwds['readonly'] = False
prev = cls.accessibles[key]
if key == 'status':
# special case: status from sea is a string, not the status tuple
pobj = prev.copy()
else:
pobj = Parameter(**kwds)
merged_properties = prev.propertyValues.copy()
pobj.updateProperties(merged_properties)
pobj.merge(merged_properties)
else:
pobj = Parameter(**kwds)
datatype = pobj.datatype
if issubclass(cls, SeaWritable) and key == 'target':
kwds['readonly'] = False
attributes['target'] = Parameter(**kwds)
attributes['target'] = pobj = Parameter(**kwds)
if prev:
merged_properties = prev.propertyValues.copy()
pobj.updateProperties(merged_properties)
pobj.merge(merged_properties)
if key in ('value', 'target'):
unit = get_cfg(cfgdict, 'unit')
if unit is not None:
pcfg = cfgdict.get(key, None)
if not isinstance(pcfg, dict):
cfgdict[key] = pcfg = {} if pcfg is None else {'value': pcfg}
pcfg['unit'] = unit
hdbpath = '/'.join([base] + pathlist)
if key in extra_module_set:
@ -693,6 +695,15 @@ class SeaModule(Module):
result = Module.__new__(newcls)
return result
@classmethod
def paramFilter(cls, result, paramdesc):
sub = paramdesc['path'].split('/', 1)
sublist = result.get(sub[0])
if sublist is None:
return False
sublist.append(paramdesc)
return True
def updateEvent(self, module, parameter, value, timestamp, readerror):
upd = getattr(self, 'update_' + parameter, None)
if upd:
@ -709,6 +720,7 @@ class SeaReadable(SeaModule, Readable):
_readerror = None
_status = IDLE, ''
unit = Property('physical unit', StringType(isUTF8=True), default='')
status = Parameter(datatype=StatusType(Readable, 'DISABLED'))
def update_value(self, value, timestamp, readerror):
@ -808,11 +820,6 @@ class SeaDrivable(SeaReadable, Drivable):
return IDLE, f'started, but not running'
return IDLE, ''
def update_target(self, module, parameter, value, timestamp, readerror):
# TODO: check if this is needed
if value is not None:
self.target = value
@Command()
def stop(self):
"""propagate to SEA
@ -821,3 +828,20 @@ class SeaDrivable(SeaReadable, Drivable):
- on EaseDriv this will set the stopped state
"""
self.io.query(f'{self.sea_object} is_running 0')
class LscDrivable(SeaDrivable):
def __new__(cls, name, logger, cfgdict, srv):
cfgdict['rel_paths'] = [pop_cfg(cfgdict, 'sensor_path', 'tm'), '.',
pop_cfg(cfgdict, 'set_path', 'set'), 'dblctrl']
return super().__new__(cls, name, logger, cfgdict, srv)
@classmethod
def paramFilter(cls, result, paramdesc):
if super().paramFilter(result, paramdesc):
return True
pathlist = paramdesc['path'].split('/')
if len(pathlist) == 1 and paramdesc.get('kids', 0) == 0:
result['.'].append(paramdesc)
return True
return False