simplify configuration of IO modules
As the communicator class needed for a module can be specified, in the configuration we do not need to specifiy it explicitly. A new configurator function IO() is introduced for this, defining names and uri only. - update also configuration reference and a tutorial example - update get_class function to accept attributes of classes like 'frappy_demo.lakshore.TemperatureSensor.ioClass' and import from modules other than frappy... like 'test.test_iocfg.Mod'. - add ioClass to the example class for the temperature controller tutorial Change-Id: I3115371d612f14024e43bc6d38b642e1d27b314d Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/38071 Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
@@ -114,14 +114,18 @@ class Collector:
|
||||
self.modules = {}
|
||||
self.warnings = []
|
||||
|
||||
def add(self, *args, **kwds):
|
||||
mod = Mod(*args, **kwds)
|
||||
def add(self, name, cls, description, **kwds):
|
||||
mod = Mod(name, cls, description, **kwds)
|
||||
name = mod.pop('name')
|
||||
if name in self.modules:
|
||||
self.warnings.append(f'duplicate module {name} overrides previous')
|
||||
self.modules[name] = mod
|
||||
return mod
|
||||
|
||||
def add_io(self, name, uri, **kwds):
|
||||
mod = Mod(name, cls='<auto>', description='', uri=uri, **kwds)
|
||||
self.modules[mod.pop('name')] = mod
|
||||
|
||||
def override(self, name, **kwds):
|
||||
"""override properties/parameters of previously configured modules
|
||||
|
||||
@@ -180,12 +184,37 @@ class Include:
|
||||
exec(compile(filename.read_bytes(), filename, 'exec'), self.namespace)
|
||||
|
||||
|
||||
def process_file(filename, log):
|
||||
config_text = filename.read_bytes()
|
||||
def fix_io_modules(cfgdict, log):
|
||||
node = cfgdict.pop('node')
|
||||
io_modules = {k: [] for k, v in cfgdict.items() if v.get('cls') == '<auto>'}
|
||||
for modname, modcfg in cfgdict.items():
|
||||
ioname = modcfg.get('io', {}).get('value')
|
||||
if ioname:
|
||||
iocfg = cfgdict.get(ioname)
|
||||
if iocfg:
|
||||
referring_modules = io_modules.get(ioname)
|
||||
if referring_modules is not None:
|
||||
iocfg['cls'] = f"{modcfg['cls']}.ioClass"
|
||||
referring_modules.append(modname)
|
||||
|
||||
# fix description:
|
||||
for ioname, referring_modules in io_modules.items():
|
||||
if referring_modules:
|
||||
if not cfgdict[ioname]['description']:
|
||||
cfgdict[ioname]['description'] = f"communicator for {', '.join(k for k in referring_modules)}"
|
||||
else:
|
||||
log.warning('remove unused io module %r', ioname)
|
||||
cfgdict.pop(ioname)
|
||||
cfgdict['node'] = node
|
||||
|
||||
|
||||
def process_file(filename, log, config_text=None):
|
||||
if config_text is None:
|
||||
config_text = filename.read_bytes()
|
||||
node = NodeCollector()
|
||||
mods = Collector()
|
||||
ns = {'Node': node.add, 'Mod': mods.add, 'Param': Param, 'Command': Param, 'Group': Group,
|
||||
'override': mods.override, 'overrideNode': node.override}
|
||||
'override': mods.override, 'overrideNode': node.override, 'IO': mods.add_io}
|
||||
ns['include'] = Include(ns, log)
|
||||
# pylint: disable=exec-used
|
||||
exec(compile(config_text, filename, 'exec'), ns)
|
||||
@@ -236,6 +265,7 @@ def load_config(cfgfiles, log):
|
||||
filename = to_config_path(str(cfgfile), log)
|
||||
log.debug('Parsing config file %s...', filename)
|
||||
cfg = process_file(filename, log)
|
||||
fix_io_modules(cfg, log)
|
||||
if config:
|
||||
config.merge_modules(cfg)
|
||||
else:
|
||||
|
||||
@@ -234,22 +234,37 @@ def clamp(_min, value, _max):
|
||||
i.e. value if min <= value <= max, else min or max depending on which side
|
||||
value lies outside the [min..max] interval. This works even when min > max!
|
||||
"""
|
||||
# return median, i.e. clamp the the value between min and max
|
||||
# return median, i.e. clamp the value between min and max
|
||||
return sorted([_min, value, _max])[1]
|
||||
|
||||
|
||||
def get_class(spec):
|
||||
"""loads a class given by string in dotted notation (as python would do)"""
|
||||
modname, classname = spec.rsplit('.', 1)
|
||||
if modname.startswith('frappy'):
|
||||
module = importlib.import_module(modname)
|
||||
else:
|
||||
# rarely needed by now....
|
||||
module = importlib.import_module('frappy.' + modname)
|
||||
try:
|
||||
return getattr(module, classname)
|
||||
except AttributeError:
|
||||
raise AttributeError('no such class') from None
|
||||
"""loads an object given by string in dotted notation (as python would do)
|
||||
|
||||
import the specified module and get the specified item from it
|
||||
examples: 'frappy_demo.lakeshore.TemperatureSensor', 'frappy.modules.Readable.Status'
|
||||
|
||||
:param spec: a dot-separated list of module names followed by the name of
|
||||
a class (or any object) and optionally names of attributes
|
||||
:return: the object
|
||||
"""
|
||||
for maxsplit in range(1, len(spec)):
|
||||
# len(spec) is high enough for all cases
|
||||
module, *attrs = spec.rsplit('.', maxsplit)
|
||||
try:
|
||||
obj = importlib.import_module(module)
|
||||
break
|
||||
except ImportError:
|
||||
if '.' in module:
|
||||
continue
|
||||
raise
|
||||
for na, attr in enumerate(attrs):
|
||||
try:
|
||||
obj = getattr(obj, attr)
|
||||
except AttributeError:
|
||||
print(na, attrs)
|
||||
raise AttributeError(f'{".".join(attrs[:na+1])!r} not found in {module!r}') from None
|
||||
return obj
|
||||
|
||||
|
||||
def mkthread(func, *args, **kwds):
|
||||
|
||||
Reference in New Issue
Block a user