Change config to Python
- Change Configuration format to be python-based. - move config logic to frappy/config.py - Add first py-config: cryo_cfg.py - Adapt test to new expected config format Change-Id: Iaec484e0e1e21ebbb1e5c74b53be6231329ddf71
This commit is contained in:
parent
db3b190c26
commit
52b77ba9e6
@ -83,7 +83,7 @@ dummy-variables-rgx=_|dummy
|
||||
|
||||
# List of additional names supposed to be defined in builtins. Remember that
|
||||
# you should avoid to define new builtins when possible.
|
||||
additional-builtins=
|
||||
additional-builtins=Node,Mod,Param,Command,Group
|
||||
|
||||
|
||||
[BASIC]
|
||||
|
38
cfg/cryo_cfg.py
Normal file
38
cfg/cryo_cfg.py
Normal file
@ -0,0 +1,38 @@
|
||||
#####################################################################
|
||||
# Python version of frappy config
|
||||
#####################################################################
|
||||
|
||||
Node('cryo_7.frappy.demo',
|
||||
'short description' \
|
||||
'' \
|
||||
'' \
|
||||
'This is a very long description providing all the glory details in all the ' \
|
||||
'glory details about the stuff we are describing',
|
||||
'tcp://10769',
|
||||
more="blub",
|
||||
)
|
||||
|
||||
# obviously not final form
|
||||
Mod('cryo',
|
||||
'frappy_demo.cryo.Cryostat',
|
||||
'A simulated cc cryostat with heat-load, specific heat for the sample and a ' \
|
||||
'temperature dependend heat-link between sample and regulation.',
|
||||
group='very important/stuff',
|
||||
jitter=0.1,
|
||||
T_start=10.0,
|
||||
target=10.0,
|
||||
looptime=1,
|
||||
ramp=6,
|
||||
maxpower=20.0,
|
||||
heater=4.1,
|
||||
mode='pid',
|
||||
tolerance=0.1,
|
||||
window=30,
|
||||
timeout=900,
|
||||
p = Param(40, unit='%/K'), # in case 'default' is the first arg, we can omit 'default='
|
||||
i = 10,
|
||||
d = 2,
|
||||
pid = Group('p', 'i', 'd'),
|
||||
pollinterval = Param(export=False),
|
||||
value = Param(unit = 'K', test = 'customized value'),
|
||||
)
|
184
frappy/config.py
Normal file
184
frappy/config.py
Normal file
@ -0,0 +1,184 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# *****************************************************************************
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Alexander Zaft <a.zaft@fz-juelich.de>
|
||||
#
|
||||
# *****************************************************************************
|
||||
import os
|
||||
|
||||
from frappy.errors import ConfigError
|
||||
from frappy.lib import generalConfig
|
||||
|
||||
class Undef:
|
||||
pass
|
||||
|
||||
class Node(dict):
|
||||
def __init__(
|
||||
self,
|
||||
equipment_id,
|
||||
description,
|
||||
interface=None,
|
||||
cls='protocol.dispatcher.Dispatcher',
|
||||
omit_unchanged_within=1.1,
|
||||
**kwds
|
||||
):
|
||||
super().__init__(
|
||||
equipment_id=equipment_id,
|
||||
description=description,
|
||||
interface=interface,
|
||||
cls=cls,
|
||||
omit_unchanged_within=omit_unchanged_within,
|
||||
**kwds
|
||||
)
|
||||
|
||||
class Param(dict):
|
||||
def __init__(self, default=Undef, **kwds):
|
||||
if default is not Undef:
|
||||
kwds['default'] = default
|
||||
super().__init__(**kwds)
|
||||
|
||||
class Group(tuple):
|
||||
def __new__(cls, *args):
|
||||
return super().__new__(cls, args)
|
||||
|
||||
class Mod(dict):
|
||||
def __init__(self, name, cls, description, **kwds):
|
||||
super().__init__(
|
||||
name=name,
|
||||
cls=cls,
|
||||
description=description
|
||||
)
|
||||
# Make parameters out of all keywords
|
||||
groups = {}
|
||||
for key, val in kwds.items():
|
||||
if isinstance(val, Param):
|
||||
self[key] = val
|
||||
elif isinstance(val, Group):
|
||||
groups[key] = val
|
||||
else:
|
||||
# shortcut to only set default
|
||||
self[key] = Param(val)
|
||||
for group, members in groups.items():
|
||||
for member in members:
|
||||
self[member]['group'] = group
|
||||
|
||||
class Collector:
|
||||
def __init__(self, cls):
|
||||
self.list = []
|
||||
self.cls = cls
|
||||
|
||||
def add(self, *args, **kwds):
|
||||
self.list.append(self.cls(*args, **kwds))
|
||||
|
||||
def append(self, mod):
|
||||
self.list.append(mod)
|
||||
|
||||
|
||||
class NodeCollector:
|
||||
def __init__(self):
|
||||
self.node = None
|
||||
|
||||
def add(self, *args, **kwds):
|
||||
if self.node is None:
|
||||
self.node = Node(*args, **kwds)
|
||||
else:
|
||||
raise ConfigError('Only one Node is allowed per file!')
|
||||
|
||||
|
||||
class Config(dict):
|
||||
def __init__(self, node, modules):
|
||||
super().__init__(
|
||||
node=node.node,
|
||||
**{mod['name']: mod for mod in modules.list}
|
||||
)
|
||||
self.module_names = {mod.pop('name') for mod in modules.list}
|
||||
self.ambiguous = set()
|
||||
|
||||
def merge_modules(self, other):
|
||||
""" merges only the modules from 'other' into 'self'"""
|
||||
self.ambiguous |= self.module_names & other.module_names
|
||||
for name, mod in other.items():
|
||||
if name == 'node':
|
||||
continue
|
||||
if name not in self.module_names:
|
||||
self.module_names.add(name)
|
||||
self.modules.append(mod)
|
||||
|
||||
|
||||
def process_file(config_text):
|
||||
node = NodeCollector()
|
||||
mods = Collector(Mod)
|
||||
ns = {'Node': node.add, 'Mod': mods.add, 'Param': Param, 'Command': Param, 'Group': Group}
|
||||
|
||||
# pylint: disable=exec-used
|
||||
exec(config_text, ns)
|
||||
return Config(node, mods)
|
||||
|
||||
|
||||
def to_config_path(cfgfile, log):
|
||||
candidates = [cfgfile + e for e in ['_cfg.py', '.py', '']]
|
||||
if os.sep in cfgfile: # specified as full path
|
||||
filename = cfgfile if os.path.exists(cfgfile) else None
|
||||
else:
|
||||
for filename in [os.path.join(d, candidate)
|
||||
for d in generalConfig.confdir.split(os.pathsep)
|
||||
for candidate in candidates]:
|
||||
if os.path.exists(filename):
|
||||
break
|
||||
else:
|
||||
filename = None
|
||||
|
||||
if filename is None:
|
||||
raise ConfigError("Couldn't find cfg file %r in %s"
|
||||
% (cfgfile, generalConfig.confdir))
|
||||
if not filename.endswith('_cfg.py'):
|
||||
log.warning("Config files should end in '_cfg.py': %s", os.path.basename(filename))
|
||||
log.debug('Using config file %s for %s', filename, cfgfile)
|
||||
return filename
|
||||
|
||||
|
||||
def load_config(cfgfiles, log):
|
||||
"""Load config files.
|
||||
|
||||
Only the node-section of the first config file will be returned.
|
||||
The others will be discarded.
|
||||
Arguments
|
||||
- cfgfiles : str
|
||||
Comma separated list of config-files
|
||||
- log : frappy.logging.Mainlogger
|
||||
Logger aquired from frappy.logging
|
||||
Returns
|
||||
- config: Config
|
||||
merged configuration
|
||||
"""
|
||||
config = None
|
||||
for cfgfile in cfgfiles.split(','):
|
||||
filename = to_config_path(cfgfile, log)
|
||||
log.debug('Parsing config file %s...', filename)
|
||||
with open(filename, 'rb') as f:
|
||||
config_text = f.read()
|
||||
cfg = process_file(config_text)
|
||||
if config:
|
||||
config.merge_modules(cfg)
|
||||
else:
|
||||
config = cfg
|
||||
|
||||
if config.ambiguous:
|
||||
log.warning('ambiguous sections in %s: %r',
|
||||
cfgfiles, list(config.ambiguous))
|
||||
return config
|
@ -327,12 +327,12 @@ class Module(HasAccessibles):
|
||||
# pylint: disable=consider-using-dict-items
|
||||
for key in self.propertyDict:
|
||||
value = cfgdict.pop(key, None)
|
||||
if value is None:
|
||||
# legacy cfg: specified as '.<propertyname> = <propertyvalue>'
|
||||
value = cfgdict.pop('.' + key, None)
|
||||
if value is not None:
|
||||
try:
|
||||
self.setProperty(key, value)
|
||||
if isinstance(value, dict):
|
||||
self.setProperty(key, value['default'])
|
||||
else:
|
||||
self.setProperty(key, value)
|
||||
except BadValueError:
|
||||
errors.append('%s: value %r does not match %r!' %
|
||||
(key, value, self.propertyDict[key].datatype))
|
||||
@ -374,24 +374,27 @@ class Module(HasAccessibles):
|
||||
self.commands = {k: v for k, v in accessibles.items() if isinstance(v, Command)}
|
||||
|
||||
# 2) check and apply parameter_properties
|
||||
# specified as '<paramname>.<propertyname> = <propertyvalue>'
|
||||
# this may also be done on commands: e.g. 'stop.visibility = advanced'
|
||||
for k, v in list(cfgdict.items()): # keep list() as dict may change during iter
|
||||
if '.' in k[1:]:
|
||||
aname, propname = k.split('.', 1)
|
||||
propvalue = cfgdict.pop(k)
|
||||
aobj = self.accessibles.get(aname, None)
|
||||
if aobj:
|
||||
try:
|
||||
for aname in list(cfgdict): # keep list() as dict may change during iter
|
||||
aobj = self.accessibles.get(aname, None)
|
||||
if aobj:
|
||||
try:
|
||||
for propname, propvalue in cfgdict[aname].items():
|
||||
# defaults are applied later
|
||||
if propname == 'default':
|
||||
continue
|
||||
aobj.setProperty(propname, propvalue)
|
||||
except KeyError:
|
||||
errors.append("'%s.%s' does not exist" %
|
||||
(aname, propname))
|
||||
except BadValueError as e:
|
||||
errors.append('%s.%s: %s' %
|
||||
(aname, propname, str(e)))
|
||||
else:
|
||||
errors.append('%r not found' % aname)
|
||||
except KeyError:
|
||||
errors.append("'%s' has no property '%s'" %
|
||||
(aname, propname))
|
||||
except BadValueError as e:
|
||||
errors.append('%s.%s: %s' %
|
||||
(aname, propname, str(e)))
|
||||
else:
|
||||
errors.append('%r not found' % aname)
|
||||
# 3) commands do not need a default, remove them from cfgdict:
|
||||
for aname in list(cfgdict):
|
||||
if aname in self.commands:
|
||||
cfgdict.pop(aname)
|
||||
|
||||
# 3) check config for problems:
|
||||
# only accept remaining config items specified in parameters
|
||||
@ -413,11 +416,11 @@ class Module(HasAccessibles):
|
||||
errors.append('%s needs a datatype' % pname)
|
||||
continue
|
||||
|
||||
if pname in cfgdict:
|
||||
if pname in cfgdict and 'default' in cfgdict[pname]:
|
||||
if pobj.initwrite is not False and hasattr(self, 'write_' + pname):
|
||||
# parameters given in cfgdict have to call write_<pname>
|
||||
try:
|
||||
pobj.value = pobj.datatype(cfgdict[pname])
|
||||
pobj.value = pobj.datatype(cfgdict[pname]['default'])
|
||||
self.writeDict[pname] = pobj.value
|
||||
except BadValueError as e:
|
||||
errors.append('%s: %s' % (pname, e))
|
||||
@ -443,7 +446,8 @@ class Module(HasAccessibles):
|
||||
pobj.value = value
|
||||
self.writeDict[pname] = value
|
||||
else:
|
||||
cfgdict[pname] = value
|
||||
# dict to fit in with parameters coming from config
|
||||
cfgdict[pname] = { 'default' : value }
|
||||
|
||||
# 5) 'apply' config:
|
||||
# pass values through the datatypes and store as attributes
|
||||
@ -452,7 +456,8 @@ class Module(HasAccessibles):
|
||||
# this checks also for the proper datatype
|
||||
# note: this will NOT call write_* methods!
|
||||
if k in self.parameters or k in self.propertyDict:
|
||||
setattr(self, k, v)
|
||||
if 'default' in v:
|
||||
setattr(self, k, v['default'])
|
||||
cfgdict.pop(k)
|
||||
except (ValueError, TypeError) as e:
|
||||
# self.log.exception(formatExtendedStack())
|
||||
|
@ -63,7 +63,7 @@ def make_update(modulename, pobj):
|
||||
class Dispatcher:
|
||||
def __init__(self, name, logger, options, srv):
|
||||
# to avoid errors, we want to eat all options here
|
||||
self.equipment_id = options.pop('id', name)
|
||||
self.equipment_id = options.pop('equipment_id', name)
|
||||
# time interval for omitting updates of unchanged values
|
||||
self.omit_unchanged_within = options.pop('omit_unchanged_within', 0.1)
|
||||
self.nodeprops = {}
|
||||
|
@ -23,8 +23,6 @@
|
||||
# *****************************************************************************
|
||||
"""Define helpers"""
|
||||
|
||||
import ast
|
||||
import configparser
|
||||
import os
|
||||
import sys
|
||||
import traceback
|
||||
@ -35,6 +33,7 @@ from frappy.lib import formatException, get_class, generalConfig
|
||||
from frappy.lib.multievent import MultiEvent
|
||||
from frappy.params import PREDEFINED_ACCESSIBLES
|
||||
from frappy.modules import Attached
|
||||
from frappy.config import load_config
|
||||
|
||||
try:
|
||||
from daemon import DaemonContext
|
||||
@ -94,75 +93,19 @@ class Server:
|
||||
# sanitize name (in case it is a cfgfile)
|
||||
name = os.path.splitext(os.path.basename(name))[0]
|
||||
self.log = parent_logger.getChild(name, True)
|
||||
merged_cfg = OrderedDict()
|
||||
ambiguous_sections = set()
|
||||
for cfgfile in cfgfiles.split(','):
|
||||
cfgdict = self.loadCfgFile(cfgfile)
|
||||
ambiguous_sections |= set(merged_cfg) & set(cfgdict)
|
||||
merged_cfg.update(cfgdict)
|
||||
self.node_cfg = merged_cfg.pop('NODE', {})
|
||||
self.interface_cfg = merged_cfg.pop('INTERFACE', {})
|
||||
|
||||
merged_cfg = load_config(cfgfiles, self.log)
|
||||
self.node_cfg = merged_cfg.pop('node')
|
||||
self.module_cfg = merged_cfg
|
||||
if interface:
|
||||
ambiguous_sections.discard('interface')
|
||||
ambiguous_sections.discard('node')
|
||||
self.node_cfg['name'] = name
|
||||
self.node_cfg['id'] = cfgfiles
|
||||
self.interface_cfg['uri'] = str(interface)
|
||||
elif 'uri' not in self.interface_cfg:
|
||||
raise ConfigError('missing interface uri')
|
||||
if ambiguous_sections:
|
||||
self.log.warning('ambiguous sections in %s: %r' % (cfgfiles, tuple(ambiguous_sections)))
|
||||
self.node_cfg['equipment_id'] = name
|
||||
self.node_cfg['interface'] = str(interface)
|
||||
elif not self.node_cfg.get('interface'):
|
||||
raise ConfigError('No interface specified in configuration or arguments!')
|
||||
|
||||
self._cfgfiles = cfgfiles
|
||||
self._pidfile = os.path.join(generalConfig.piddir, name + '.pid')
|
||||
|
||||
def loadCfgFile(self, cfgfile):
|
||||
if not cfgfile.endswith('.cfg'):
|
||||
cfgfile += '.cfg'
|
||||
if os.sep in cfgfile: # specified as full path
|
||||
filename = cfgfile if os.path.exists(cfgfile) else None
|
||||
else:
|
||||
for filename in [os.path.join(d, cfgfile) for d in generalConfig.confdir.split(os.pathsep)]:
|
||||
if os.path.exists(filename):
|
||||
break
|
||||
else:
|
||||
filename = None
|
||||
if filename is None:
|
||||
raise ConfigError("Couldn't find cfg file %r in %s" % (cfgfile, generalConfig.confdir))
|
||||
self.log.debug('Parse config file %s ...' % filename)
|
||||
result = OrderedDict()
|
||||
parser = configparser.ConfigParser()
|
||||
parser.optionxform = str
|
||||
if not parser.read([filename]):
|
||||
raise ConfigError("Couldn't read cfg file %r" % filename)
|
||||
for section, options in parser.items():
|
||||
if section == 'DEFAULT':
|
||||
continue
|
||||
opts = {}
|
||||
for k, v in options.items():
|
||||
# is the following really needed? - ConfigParser supports multiple lines!
|
||||
while '\n.\n' in v:
|
||||
v = v.replace('\n.\n', '\n\n')
|
||||
try:
|
||||
opts[k] = ast.literal_eval(v)
|
||||
except Exception:
|
||||
opts[k] = v
|
||||
# convert old form
|
||||
name, _, arg = section.partition(' ')
|
||||
if arg:
|
||||
if name == 'node':
|
||||
name = 'NODE'
|
||||
opts['id'] = arg
|
||||
elif name == 'interface':
|
||||
name = 'INTERFACE'
|
||||
if 'bindport' in opts:
|
||||
opts.pop('bindto', None)
|
||||
opts['uri'] = '%s://%s' % (opts.pop('type', arg), opts.pop('bindport'))
|
||||
elif name == 'module':
|
||||
name = arg
|
||||
result[name] = opts
|
||||
return result
|
||||
|
||||
def start(self):
|
||||
if not DaemonContext:
|
||||
raise ConfigError('can not daemonize, as python-daemon is not installed')
|
||||
@ -196,7 +139,7 @@ class Server:
|
||||
print(formatException(verbose=True))
|
||||
raise
|
||||
|
||||
opts = dict(self.interface_cfg)
|
||||
opts = {'uri': self.node_cfg['interface']}
|
||||
scheme, _, _ = opts['uri'].rpartition('://')
|
||||
scheme = scheme or 'tcp'
|
||||
cls = get_class(self.INTERFACES[scheme])
|
||||
@ -226,8 +169,9 @@ class Server:
|
||||
def _processCfg(self):
|
||||
errors = []
|
||||
opts = dict(self.node_cfg)
|
||||
cls = get_class(opts.pop('class', 'protocol.dispatcher.Dispatcher'))
|
||||
cls = get_class(opts.pop('cls'))
|
||||
self.dispatcher = cls(opts.pop('name', self._cfgfiles), self.log.getChild('dispatcher'), opts, self)
|
||||
|
||||
if opts:
|
||||
errors.append(self.unknown_options(cls, opts))
|
||||
self.modules = OrderedDict()
|
||||
@ -238,7 +182,7 @@ class Server:
|
||||
opts = dict(options)
|
||||
pymodule = None
|
||||
try:
|
||||
classname = opts.pop('class')
|
||||
classname = opts.pop('cls')
|
||||
pymodule = classname.rpartition('.')[0]
|
||||
if pymodule in failed:
|
||||
continue
|
||||
|
@ -62,6 +62,7 @@ class ServerStub:
|
||||
class ModuleTest(Module):
|
||||
def __init__(self, updates=None, **opts):
|
||||
opts['description'] = ''
|
||||
opts = {p: {'default': val} for p, val in opts.items()}
|
||||
super().__init__('mod', logger, opts, ServerStub(updates or {}))
|
||||
|
||||
|
||||
|
@ -77,7 +77,7 @@ class DummyMultiEvent(threading.Event):
|
||||
|
||||
|
||||
def test_Communicator():
|
||||
o = Communicator('communicator', LoggerStub(), {'.description': ''}, ServerStub({}))
|
||||
o = Communicator('communicator', LoggerStub(), {'description': ''}, ServerStub({}))
|
||||
o.earlyInit()
|
||||
o.initModule()
|
||||
event = DummyMultiEvent()
|
||||
@ -177,8 +177,8 @@ def test_ModuleMagic():
|
||||
objects = []
|
||||
|
||||
for newclass, sortcheck in [(Newclass1, sortcheck1), (Newclass2, sortcheck2)]:
|
||||
o1 = newclass('o1', logger, {'.description':''}, srv)
|
||||
o2 = newclass('o2', logger, {'.description':''}, srv)
|
||||
o1 = newclass('o1', logger, {'description':''}, srv)
|
||||
o2 = newclass('o2', logger, {'description':''}, srv)
|
||||
for obj in [o1, o2]:
|
||||
objects.append(obj)
|
||||
for o in obj.accessibles.values():
|
||||
@ -188,7 +188,7 @@ def test_ModuleMagic():
|
||||
assert list(obj.accessibles) == sortcheck
|
||||
|
||||
# check for inital updates working properly
|
||||
o1 = Newclass1('o1', logger, {'.description':''}, srv)
|
||||
o1 = Newclass1('o1', logger, {'description':''}, srv)
|
||||
expectedBeforeStart = {'target': '', 'status': (Drivable.Status.IDLE, ''),
|
||||
'param1': False, 'param2': 1.0, 'a1': 0.0, 'a2': True, 'pollinterval': 5.0,
|
||||
'value': 'first'}
|
||||
@ -205,7 +205,7 @@ def test_ModuleMagic():
|
||||
assert updates.pop('o1') == expectedAfterStart
|
||||
|
||||
# check in addition if parameters are written
|
||||
o2 = Newclass2('o2', logger, {'.description':'', 'a1': 2.7}, srv)
|
||||
o2 = Newclass2('o2', logger, {'description':'', 'a1': {'default': 2.7}}, srv)
|
||||
# no update for b2, as this has to be written
|
||||
expectedBeforeStart['a1'] = 2.7
|
||||
expectedBeforeStart['target'] = 0.0
|
||||
@ -224,10 +224,10 @@ def test_ModuleMagic():
|
||||
|
||||
assert not updates
|
||||
|
||||
o1 = Newclass1('o1', logger, {'.description':''}, srv)
|
||||
o2 = Newclass2('o2', logger, {'.description':''}, srv)
|
||||
o1 = Newclass1('o1', logger, {'description':''}, srv)
|
||||
o2 = Newclass2('o2', logger, {'description':''}, srv)
|
||||
assert o2.parameters['a1'].datatype.unit == 'deg/s'
|
||||
o2 = Newclass2('o2', logger, {'.description':'', 'value.unit':'mm', 'param2.unit':'mm'}, srv)
|
||||
o2 = Newclass2('o2', logger, {'description':'', 'value':{'unit':'mm'},'param2':{'unit':'mm'}}, srv)
|
||||
# check datatype is not shared
|
||||
assert o1.parameters['param2'].datatype.unit == 'Ohm'
|
||||
assert o2.parameters['param2'].datatype.unit == 'mm'
|
||||
@ -374,13 +374,13 @@ def test_command_check():
|
||||
with pytest.raises(ProgrammingError):
|
||||
BadDatatype('o', logger, {
|
||||
'description': '',
|
||||
'cmd.argument': {'type': 'double', 'min': 1, 'max': 0},
|
||||
'cmd': {'argument': {'type': 'double', 'min': 1, 'max': 0}},
|
||||
}, srv)
|
||||
|
||||
with pytest.raises(ProgrammingError):
|
||||
BadDatatype('o', logger, {
|
||||
'description': '',
|
||||
'cmd.visibility': 'invalid',
|
||||
'cmd': {'visibility': 'invalid'},
|
||||
}, srv)
|
||||
|
||||
|
||||
@ -413,17 +413,15 @@ def test_mixin():
|
||||
|
||||
MixedDrivable('o', logger, {
|
||||
'description': '',
|
||||
'param1.description': 'param 1',
|
||||
'param1': 0,
|
||||
'param2.datatype': {"type": "double"},
|
||||
'param1': {'default': 0, 'description': 'param1'},
|
||||
'param2': {'datatype': {"type": "double"}},
|
||||
}, srv)
|
||||
|
||||
with pytest.raises(ConfigError):
|
||||
MixedReadable('o', logger, {
|
||||
'description': '',
|
||||
'param1.description': 'param 1',
|
||||
'param1': 0,
|
||||
'param2.datatype': {"type": "double"},
|
||||
'param1': {'default': 0, 'description': 'param1'},
|
||||
'param2': {'datatype': {"type": "double"}},
|
||||
}, srv)
|
||||
|
||||
|
||||
@ -455,7 +453,7 @@ def test_command_config():
|
||||
srv = ServerStub({})
|
||||
mod = Mod('o', logger, {
|
||||
'description': '',
|
||||
'convert.argument': {'type': 'bool'},
|
||||
'convert': {'argument': {'type': 'bool'}},
|
||||
}, srv)
|
||||
assert mod.commands['convert'].datatype.export_datatype() == {
|
||||
'type': 'command',
|
||||
@ -465,7 +463,7 @@ def test_command_config():
|
||||
|
||||
mod = Mod('o', logger, {
|
||||
'description': '',
|
||||
'convert.datatype': {'type': 'command', 'argument': {'type': 'bool'}, 'result': {'type': 'bool'}},
|
||||
'convert': {'datatype': {'type': 'command', 'argument': {'type': 'bool'}, 'result': {'type': 'bool'}}},
|
||||
}, srv)
|
||||
assert mod.commands['convert'].datatype.export_datatype() == {
|
||||
'type': 'command',
|
||||
@ -529,7 +527,7 @@ def test_generic_access():
|
||||
updates = {}
|
||||
srv = ServerStub(updates)
|
||||
|
||||
obj = Mod('obj', logger, {'description': '', 'param': 'initial value'}, srv)
|
||||
obj = Mod('obj', logger, {'description': '', 'param': {'default':'initial value'}}, srv)
|
||||
assert obj.param == 'initial value'
|
||||
assert obj.write_param('Cheese') == 'cheese'
|
||||
assert obj.write_unhandled('Cheese') == 'Cheese'
|
||||
@ -590,7 +588,7 @@ def test_no_read_write():
|
||||
updates = {}
|
||||
srv = ServerStub(updates)
|
||||
|
||||
obj = Mod('obj', logger, {'description': '', 'param': 'cheese'}, srv)
|
||||
obj = Mod('obj', logger, {'description': '', 'param': {'default': 'cheese'}}, srv)
|
||||
assert obj.param == 'cheese'
|
||||
assert obj.read_param() == 'cheese'
|
||||
assert updates == {'obj': {'param': 'cheese'}}
|
||||
@ -630,7 +628,7 @@ def test_problematic_value_range():
|
||||
|
||||
srv = ServerStub({})
|
||||
|
||||
obj = Mod('obj', logger, {'description': '', 'value.max': 10.1}, srv) # pylint: disable=unused-variable
|
||||
obj = Mod('obj', logger, {'description': '', 'value':{'max': 10.1}}, srv) # pylint: disable=unused-variable
|
||||
|
||||
with pytest.raises(ConfigError):
|
||||
obj = Mod('obj', logger, {'description': ''}, srv)
|
||||
@ -640,16 +638,16 @@ def test_problematic_value_range():
|
||||
target = Parameter('', FloatRange(), default=0)
|
||||
|
||||
obj = Mod2('obj', logger, {'description': ''}, srv)
|
||||
obj = Mod2('obj', logger, {'description': '', 'target.min': 0, 'target.max': 10}, srv)
|
||||
obj = Mod2('obj', logger, {'description': '', 'target':{'min': 0, 'max': 10}}, srv)
|
||||
|
||||
with pytest.raises(ConfigError):
|
||||
obj = Mod('obj', logger, {
|
||||
'value.min': 0, 'value.max': 10,
|
||||
'target.min': 0, 'target.max': 10, 'description': ''}, srv)
|
||||
'value':{'min': 0, 'max': 10},
|
||||
'target':{'min': 0, 'max': 10}, 'description': ''}, srv)
|
||||
|
||||
obj = Mod('obj', logger, {'disable_value_range_check': True,
|
||||
'value.min': 0, 'value.max': 10,
|
||||
'target.min': 0, 'target.max': 10, 'description': ''}, srv)
|
||||
'value': {'min': 0, 'max': 10},
|
||||
'target': {'min': 0, 'max': 10}, 'description': ''}, srv)
|
||||
|
||||
generalConfig.defaults['disable_value_range_check'] = True
|
||||
|
||||
@ -657,15 +655,15 @@ def test_problematic_value_range():
|
||||
value = Parameter('', FloatRange(0, 10), default=0)
|
||||
target = Parameter('', FloatRange(0, 10), default=0)
|
||||
obj = Mod4('obj', logger, {
|
||||
'value.min': 0, 'value.max': 10,
|
||||
'target.min': 0, 'target.max': 10, 'description': ''}, srv)
|
||||
'value': {'min': 0, 'max': 10},
|
||||
'target': {'min': 0, 'max': 10}, 'description': ''}, srv)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('config, dynamicunit, finalunit, someunit', [
|
||||
({}, 'K', 'K', 'K'),
|
||||
({'value.unit': 'K'}, 'C', 'C', 'C'),
|
||||
({'value.unit': 'K'}, '', 'K', 'K'),
|
||||
({'value.unit': 'K', 'someparam.unit': 'A'}, 'C', 'C', 'A'),
|
||||
({'value':{'unit': 'K'}}, 'C', 'C', 'C'),
|
||||
({'value':{'unit': 'K'}}, '', 'K', 'K'),
|
||||
({'value':{'unit': 'K'}, 'someparam':{'unit': 'A'}}, 'C', 'C', 'A'),
|
||||
])
|
||||
def test_deferred_main_unit(config, dynamicunit, finalunit, someunit):
|
||||
# this pattern is used in frappy_mlz.entangle.AnalogInput
|
||||
|
@ -254,7 +254,7 @@ class Mod(HasStates, Drivable):
|
||||
|
||||
def create_module():
|
||||
updates = []
|
||||
obj = Mod('obj', LoggerStub(), {'.description': ''}, ServerStub(updates))
|
||||
obj = Mod('obj', LoggerStub(), {'description': ''}, ServerStub(updates))
|
||||
obj.initModule()
|
||||
obj.statelist = []
|
||||
try:
|
||||
|
Loading…
x
Reference in New Issue
Block a user