fix several regressions from switching to accessibles

also include some basic tests now

Change-Id: Ia07892c03f4d72f5da307a79a9827f926940881d
Reviewed-on: https://forge.frm2.tum.de/review/18539
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Tested-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
This commit is contained in:
Enrico Faulhaber 2018-07-26 17:53:00 +02:00
parent 5b273e36a7
commit b1f1653ebd
10 changed files with 283 additions and 80 deletions

View File

@ -65,6 +65,10 @@ class ModuleMeta(type):
(so the dispatcher will get notfied of changed values)
"""
def __new__(mcs, name, bases, attrs):
commands = attrs.pop('commands', {})
parameters = attrs.pop('parameters', {})
overrides = attrs.pop('overrides', {})
newtype = type.__new__(mcs, name, bases, attrs)
if '__constructed__' in attrs:
return newtype
@ -82,17 +86,15 @@ class ModuleMeta(type):
for base in reversed(bases):
if hasattr(base, "accessibles"):
accessibles_list.append(base.accessibles)
for entry in ['accessibles', 'parameters', 'commands', 'overrides']:
accessibles_list.append(attrs.get(entry, {}))
accessibles = {} # unordered dict of accessibles
newtype.parameters = {}
for accessibles in [attrs.get('accessibles', {}), parameters, commands, overrides]:
accessibles_list.append(accessibles)
accessibles = {} # unordered dict of accessibles, will be sorted later
for accessibles_dict in accessibles_list:
for key, obj in accessibles_dict.items():
if isinstance(obj, Override):
try:
obj = obj.apply(accessibles[key])
accessibles[key] = obj
newtype.parameters[key] = obj
except KeyError:
raise ProgrammingError("module %s: %s does not exist"
% (name, key))
@ -104,16 +106,16 @@ class ModuleMeta(type):
# raise ProgrammingError("module %s: %s must not be redefined"
# % (name, key))
if isinstance(obj, Parameter):
newtype.parameters[key] = obj
accessibles[key] = obj
elif isinstance(obj, Command):
# XXX: convert to param with datatype=CommandType???
accessibles[key] = obj
else:
raise ProgrammingError('%r: accessibles entry %r should be a '
'Parameter or Command object!' % (name, key))
# Correct naming of EnumTypes
for k, v in newtype.parameters.items():
for k, v in accessibles.items():
if isinstance(v.datatype, EnumType) and not v.datatype._enum.name:
v.datatype._enum.name = k
@ -123,10 +125,13 @@ class ModuleMeta(type):
newtype.accessibles = OrderedDict(sorted(accessibles.items(), key=lambda item: item[1].ctr))
# check validity of Parameter entries
for pname, pobj in newtype.parameters.items():
for pname, pobj in newtype.accessibles.items():
# XXX: create getters for the units of params ??
# wrap of reading/writing funcs
if isinstance(pobj, Command):
# skip commands for now
continue
rfunc = attrs.get('read_' + pname, None)
for base in bases:
if rfunc is not None:
@ -140,7 +145,7 @@ class ModuleMeta(type):
else:
# return cached value
self.log.debug("rfunc(%s): return cached value" % pname)
value = self.parameters[pname].value
value = self.accessibles[pname].value
setattr(self, pname, value) # important! trigger the setter
return value
@ -159,13 +164,13 @@ class ModuleMeta(type):
def wrapped_wfunc(self, value, pname=pname, wfunc=wfunc):
self.log.debug("wfunc(%s): set %r" % (pname, value))
pobj = self.parameters[pname]
pobj = self.accessibles[pname]
value = pobj.datatype.validate(value)
if wfunc:
self.log.debug('calling %r(%r)' % (wfunc, value))
value = wfunc(self, value) or value
# XXX: use setattr or direct manipulation
# of self.parameters[pname]?
# of self.accessibles[pname]?
setattr(self, pname, value)
return value
@ -176,16 +181,16 @@ class ModuleMeta(type):
wrapped_wfunc.__wrapped__ = True
def getter(self, pname=pname):
return self.parameters[pname].value
return self.accessibles[pname].value
def setter(self, value, pname=pname):
pobj = self.parameters[pname]
pobj = self.accessibles[pname]
value = pobj.datatype.validate(value)
pobj.timestamp = time.time()
if (not EVENT_ONLY_ON_CHANGED_VALUES) or (value != pobj.value):
pobj.value = value
# also send notification
if self.parameters[pname].export:
if self.accessibles[pname].export:
self.log.debug('%s is now %r' % (pname, value))
self.DISPATCHER.announce_update(self, pname, pobj)
@ -194,7 +199,7 @@ class ModuleMeta(type):
# check information about Command's
for attrname in attrs:
if attrname.startswith('do_'):
if attrname[3:] not in newtype.commands:
if attrname[3:] not in newtype.accessibles:
raise ProgrammingError('%r: command %r has to be specified '
'explicitly!' % (name, attrname[3:]))
attrs['__constructed__'] = True

View File

@ -81,27 +81,17 @@ class Module(object):
self.DISPATCHER = dispatcher
self.log = logger
self.name = modname
# make local copies of parameter objects
# they need to be individual per instance since we use them also
# to cache the current value + qualifiers...
params = {}
for k, v in list(self.parameters.items()):
entry = v.copy()
if '$' in entry.unit:
entry.unit = entry.unit.replace('$', self.parameters['value'].unit)
params[k] = entry
# do not re-use self.parameters as this is the same for all instances
self.parameters = params
# make local copies of properties
# handle module properties
# 1) make local copies of properties
# XXX: self.properties = self.properties.copy() ???
props = {}
for k, v in list(self.properties.items()):
props[k] = v
self.properties = props
# check and apply properties specified in cfgdict
# moduleproperties are to be specified as
# '.<propertyname>=<propertyvalue>'
# 2) check and apply properties specified in cfgdict
# specified as '.<propertyname> = <propertyvalue>'
for k, v in list(cfgdict.items()): # keep list() as dict may change during iter
if k[0] == '.':
if k[1:] in self.properties:
@ -109,42 +99,61 @@ class Module(object):
else:
raise ConfigError('Module %r has no property %r' %
(self.name, k[1:]))
# remove unset (default) module properties
# 3) remove unset (default) module properties
for k, v in list(self.properties.items()): # keep list() as dict may change during iter
if v is None:
del self.properties[k]
# MAGIC: derive automatic properties
# 4) set automatic properties
mycls = self.__class__
myclassname = '%s.%s' % (mycls.__module__, mycls.__name__)
self.properties['_implementation'] = myclassname
self.properties['interface_class'] = [
b.__name__ for b in mycls.__mro__ if b.__module__.startswith('secop.modules')]
# check and apply parameter_properties
# specified as '<paramname>.<propertyname> = <propertyvalue>'
# handle Features
# XXX: todo
# handle accessibles
# 1) make local copies of parameter objects
# they need to be individual per instance since we use them also
# to cache the current value + qualifiers...
accessibles = {}
for k, v in self.accessibles.items():
# make a copy of the Parameter/Command object
accessibles[k] = v.copy()
# do not re-use self.accessibles as this is the same for all instances
self.accessibles = accessibles
# 2) check and apply parameter_properties
# specified as '<paramname>.<propertyname> = <propertyvalue>'
for k, v in list(cfgdict.items()): # keep list() as dict may change during iter
if '.' in k[1:]:
paramname, propname = k.split('.', 1)
if paramname in self.parameters:
paramobj = self.parameters[paramname]
paramobj = self.accessibles.get(paramname, None)
if paramobj:
if propname == 'datatype':
paramobj.datatype = get_datatype(cfgdict.pop(k))
elif hasattr(paramobj, propname):
setattr(paramobj, propname, cfgdict.pop(k))
else:
raise ConfigError('Module %s: Parameter %r has not property %r!' %
(self.name, paramname, propname))
# check config for problems
# only accept remaining config items specified in parameters
# 3) check config for problems:
# only accept remaining config items specified in parameters
for k, v in cfgdict.items():
if k not in self.parameters:
if k not in self.accessibles:
raise ConfigError(
'Module %s:config Parameter %r '
'not unterstood! (use one of %s)' %
(self.name, k, ', '.join(self.parameters)))
(self.name, k, ', '.join(n for n,o in self.accessibles if isinstance(o, Parameter))))
# complain if a Parameter entry has no default value and
# is not specified in cfgdict
for k, v in self.parameters.items():
# 4) complain if a Parameter entry has no default value and
# is not specified in cfgdict
for k, v in self.accessibles.items():
if not isinstance(v, Parameter):
continue
if k not in cfgdict:
if v.default is unset_value and k != 'value':
# unset_value is the one single value you can not specify....
@ -154,16 +163,13 @@ class Module(object):
# assume default value was given
cfgdict[k] = v.default
# replace CLASS level Parameter objects with INSTANCE level ones
# self.parameters[k] = self.parameters[k].copy() # already done above...
# now 'apply' config:
# pass values through the datatypes and store as attributes
# 5) 'apply' config:
# pass values through the datatypes and store as attributes
for k, v in cfgdict.items():
if k == 'value':
continue
# apply datatype, complain if type does not fit
datatype = self.parameters[k].datatype
datatype = self.accessibles[k].datatype
try:
v = datatype.validate(v)
except (ValueError, TypeError):
@ -175,6 +181,14 @@ class Module(object):
# write to the hardware, if possible!
setattr(self, k, v)
# Adopt units AFTER applying the cfgdict
for k, v in self.accessibles.items():
if not isinstance(v, Parameter):
continue
if '$' in v.unit:
v.unit = v.unit.replace('$', self.accessibles['value'].unit)
def init(self):
# may be overriden in derived classes to init stuff
self.log.debug('empty init()')
@ -258,7 +272,9 @@ class Readable(Module):
def poll(self, nr=0):
# Just poll all parameters regularly where polling is enabled
for pname, pobj in self.parameters.items():
for pname, pobj in self.accessibles.items():
if not isinstance(pobj, Parameter):
continue
if not pobj.poll:
continue
if nr % abs(int(pobj.poll)) == 0:
@ -309,7 +325,9 @@ class Drivable(Writable):
# poll status first
stat = self.read_status(0)
fastpoll = stat[0] == self.Status.BUSY
for pname, pobj in self.parameters.items():
for pname, pobj in self.accessibles.items():
if not isinstance(pobj, Parameter):
continue
if not pobj.poll:
continue
if pname == 'status':

View File

@ -34,7 +34,6 @@ class CountedObj(object):
cl[0] += 1
self.ctr = cl[0]
class Parameter(CountedObj):
"""storage for Parameter settings + value + qualifiers
@ -96,7 +95,9 @@ class Parameter(CountedObj):
def copy(self):
# return a copy of ourselfs
return Parameter(**self.__dict__)
params = self.__dict__.copy()
params.pop('ctr')
return Parameter(**params)
def for_export(self):
# used for serialisation only
@ -143,6 +144,7 @@ class Override(CountedObj):
raise ProgrammingError(
"Can not apply Override(%s=%r) to %r: non-existing property!" %
(k, v, props))
props['ctr'] = self.ctr
return Parameter(**props)
else:
raise ProgrammingError(
@ -153,16 +155,20 @@ class Override(CountedObj):
class Command(CountedObj):
"""storage for Commands settings (description + call signature...)
"""
def __init__(self, description, arguments=None, result=None, export=True, optional=False):
def __init__(self, description, arguments=None, result=None, export=True, optional=False, datatype=None, ctr=None):
super(Command, self).__init__()
# descriptive text for humans
self.description = description
# list of datatypes for arguments
self.arguments = arguments or []
self.datatype = CommandType(arguments, result)
self.arguments = arguments
self.result = result
# whether implementation is optional
self.optional = optional
self.export = export
if ctr is not None:
self.ctr = ctr
def __repr__(self):
return '%s_%d(%s)' % (self.__class__.__name__, self.ctr, ', '.join(
@ -175,3 +181,9 @@ class Command(CountedObj):
description=self.description,
datatype = self.datatype.export_datatype(),
)
def copy(self):
# return a copy of ourselfs
params = self.__dict__.copy()
params.pop('ctr')
return Command(**params)

View File

@ -45,6 +45,7 @@ from secop.protocol.messages import Message, EVENTREPLY, IDENTREQUEST
from secop.protocol.errors import SECOPError, NoSuchModuleError, \
NoSuchCommandError, NoSuchParameterError, BadValueError, ReadonlyError
from secop.lib import formatExtendedStack, formatException
from secop.params import Parameter, Command
try:
unicode('a')
@ -156,7 +157,7 @@ class Dispatcher(object):
# omit export=False params!
res = []
for aname, aobj in self.get_module(modulename).accessibles.items():
if aobj.export:
if isinstance(aobj, Command) or aobj.export:
res.extend([aname, aobj.for_export()])
self.log.debug(u'list accessibles for module %s -> %r' %
(modulename, res))
@ -211,8 +212,8 @@ class Dispatcher(object):
if moduleobj is None:
raise NoSuchModuleError(module=modulename)
pobj = moduleobj.parameters.get(pname, None)
if pobj is None:
pobj = moduleobj.accessibles.get(pname, None)
if pobj is None or not isinstance(pobj, Parameter):
raise NoSuchParameterError(module=modulename, parameter=pname)
if pobj.readonly:
raise ReadonlyError(module=modulename, parameter=pname)
@ -232,8 +233,8 @@ class Dispatcher(object):
if moduleobj is None:
raise NoSuchModuleError(module=modulename)
pobj = moduleobj.parameters.get(pname, None)
if pobj is None:
pobj = moduleobj.accessibles.get(pname, None)
if pobj is None or not isinstance(pobj, Parameter):
raise NoSuchParameterError(module=modulename, parameter=pname)
readfunc = getattr(moduleobj, u'read_%s' % pname, None)
@ -379,7 +380,9 @@ class Dispatcher(object):
if moduleobj is None:
self.log.error(u'activate: can not lookup module %r, skipping it' % modulename)
continue
for pname, pobj in moduleobj.parameters.items():
for pname, pobj in moduleobj.accessibles.items():
if not isinstance(pobj, Parameter):
continue
if not pobj.export: # XXX: handle export_as cases!
continue
# can not use announce_update here, as this will send to all clients

View File

@ -39,19 +39,19 @@ class SimBase(object):
if '.extra_params' in cfgdict:
extra_params = cfgdict.pop('.extra_params')
# make a copy of self.parameter
self.parameters = dict((k, v.copy()) for k, v in self.parameters.items())
self.accessibles = dict((k, v.copy()) for k, v in self.accessibles.items())
for k in extra_params.split(','):
k = k.strip()
self.parameters[k] = Parameter('extra_param: %s' % k.strip(),
self.accessibles[k] = Parameter('extra_param: %s' % k.strip(),
datatype=FloatRange(),
default=0.0)
def reader(maxage=0, pname=k):
self.log.debug('simulated reading %s' % pname)
return self.parameters[pname].value
return self.accessibles[pname].value
setattr(self, 'read_' + k, reader)
def writer(newval, pname=k):
self.log.debug('simulated writing %r to %s' % (newval, pname))
self.parameters[pname].value = newval
self.accessibles[pname].value = newval
return newval
setattr(self, 'write_' + k, writer)
@ -70,7 +70,7 @@ class SimBase(object):
return True
def read_value(self, maxage=0):
if 'jitter' in self.parameters:
if 'jitter' in self.accessibles:
return self._value + self.jitter*(0.5-random.random())
return self._value
@ -85,14 +85,14 @@ class SimReadable(SimBase, Readable):
def __init__(self, logger, cfgdict, devname, dispatcher):
SimBase.__init__(self, cfgdict)
Readable.__init__(self, logger, cfgdict, devname, dispatcher)
self._value = self.parameters['value'].default
self._value = self.accessibles['value'].default
class SimWritable(SimBase, Writable):
def __init__(self, logger, cfgdict, devname, dispatcher):
SimBase.__init__(self, cfgdict)
Writable.__init__(self, logger, cfgdict, devname, dispatcher)
self._value = self.parameters['value'].default
self._value = self.accessibles['value'].default
def read_value(self, maxage=0):
return self.target
def write_target(self, value):
@ -103,16 +103,16 @@ class SimDrivable(SimBase, Drivable):
def __init__(self, logger, cfgdict, devname, dispatcher):
SimBase.__init__(self, cfgdict)
Drivable.__init__(self, logger, cfgdict, devname, dispatcher)
self._value = self.parameters['value'].default
self._value = self.accessibles['value'].default
def sim(self):
while self._value == self.target:
sleep(0.3)
self.status = self.Status.BUSY, 'MOVING'
speed = 0
if 'ramp' in self.parameters:
if 'ramp' in self.accessibles:
speed = self.ramp / 60. # ramp is per minute!
elif 'speed' in self.parameters:
elif 'speed' in self.accessibles:
speed = self.speed
if speed == 0:
self._value = self.target

View File

@ -74,7 +74,7 @@ class Switch(Drivable):
return self.Status.BUSY, info
def _update(self):
started = self.parameters['target'].timestamp
started = self.accessibles['target'].timestamp
info = ''
if self.target > self.value:
info = 'waiting for ON'

View File

@ -173,7 +173,7 @@ class GarfieldMagnet(SequencerMixin, Drivable):
def read_abslimits(self, maxage=0):
maxfield = self._current2field(self._currentsource.abslimits[1])
# limit to configured value (if any)
maxfield = min(maxfield, max(self.parameters['abslimits'].default))
maxfield = min(maxfield, max(self.accessibles['abslimits'].default))
return -maxfield, maxfield
def read_ramp(self, maxage=0):

View File

@ -386,7 +386,7 @@ class AnalogInput(PyTangoDevice, Readable):
# prefer configured unit if nothing is set on the Tango device, else
# update
if attrInfo.unit != 'No unit':
self.parameters['value'].unit = attrInfo.unit
self.accessibles['value'].unit = attrInfo.unit
def read_value(self, maxage=0):
return self._dev.value
@ -469,7 +469,7 @@ class AnalogOutput(PyTangoDevice, Drivable):
# prefer configured unit if nothing is set on the Tango device, else
# update
if attrInfo.unit != 'No unit':
self.parameters['value'].unit = attrInfo.unit
self.accessibles['value'].unit = attrInfo.unit
def poll(self, nr=0):
super(AnalogOutput, self).poll(nr)
@ -805,7 +805,7 @@ class NamedDigitalInput(DigitalInput):
super(NamedDigitalInput, self).init()
try:
# pylint: disable=eval-used
self.parameters['value'].datatype = EnumType('value', **eval(self.mapping))
self.accessibles['value'].datatype = EnumType('value', **eval(self.mapping))
except Exception as e:
raise ValueError('Illegal Value for mapping: %r' % e)
@ -829,7 +829,7 @@ class PartialDigitalInput(NamedDigitalInput):
def init(self):
super(PartialDigitalInput, self).init()
self._mask = (1 << self.bitwidth) - 1
# self.parameters['value'].datatype = IntRange(0, self._mask)
# self.accessibles['value'].datatype = IntRange(0, self._mask)
def read_value(self, maxage=0):
raw_value = self._dev.value
@ -872,9 +872,9 @@ class NamedDigitalOutput(DigitalOutput):
super(NamedDigitalOutput, self).init()
try:
# pylint: disable=eval-used
self.parameters['value'].datatype = EnumType('value', **eval(self.mapping))
self.accessibles['value'].datatype = EnumType('value', **eval(self.mapping))
# pylint: disable=eval-used
self.parameters['target'].datatype = EnumType('target', **eval(self.mapping))
self.accessibles['target'].datatype = EnumType('target', **eval(self.mapping))
except Exception as e:
raise ValueError('Illegal Value for mapping: %r' % e)
@ -899,8 +899,8 @@ class PartialDigitalOutput(NamedDigitalOutput):
def init(self):
super(PartialDigitalOutput, self).init()
self._mask = (1 << self.bitwidth) - 1
# self.parameters['value'].datatype = IntRange(0, self._mask)
# self.parameters['target'].datatype = IntRange(0, self._mask)
# self.accessibles['value'].datatype = IntRange(0, self._mask)
# self.accessibles['target'].datatype = IntRange(0, self._mask)
def read_value(self, maxage=0):
raw_value = self._dev.value

104
test/test_modules.py Normal file
View File

@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
#
# *****************************************************************************
"""test data types."""
from __future__ import print_function
import sys
sys.path.insert(0, sys.path[0] + '/..')
# no fixtures needed
import pytest
from secop.datatypes import BoolType, EnumType
from secop.metaclass import ModuleMeta
from secop.params import Command, Parameter, Override
from secop.modules import Module, Readable, Writable, Drivable, Communicator
def test_Communicator():
logger = type('LoggerStub', (object,), dict(
debug = lambda self, *a: print(*a),
info = lambda self, *a: print(*a),
))()
dispatcher = type('DispatcherStub', (object,), dict(
announce_update = lambda self, m, pn, pv: print('%s:%s=%r' % (m.name, pn, pv)),
))()
o = Communicator(logger, {}, 'o1', dispatcher)
o.init()
def test_ModuleMeta():
newclass = ModuleMeta.__new__(ModuleMeta, 'TestReadable', (Drivable, Writable, Readable, Module), {
"parameters" : {
'param1' : Parameter('param1', datatype=BoolType(), default=False),
'param2': Parameter('param2', datatype=BoolType(), default=True),
},
"commands": {
"cmd": Command('stuff',[BoolType()], BoolType())
},
"accessibles": {
'a1': Parameter('a1', datatype=BoolType(), default=False),
'a2': Parameter('a2', datatype=BoolType(), default=True),
'value':Override(datatype=BoolType(), default = True),
'cmd2': Command('another stuff', [BoolType()], BoolType()),
},
"do_cmd": lambda self, arg: not arg,
"do_cmd2": lambda self, arg: not arg,
"read_param1": lambda self, *args: True,
"read_param2": lambda self, *args: False,
"read_a1": lambda self, *args: True,
"read_a2": lambda self, *args: False,
"read_value": lambda self, *args: True,
"init": lambda self, *args: [None for self.accessibles['value'].datatype in [EnumType('value', OK=1, Bad=2)]],
})
# every cmd/param has to be collected to accessibles
assert newclass.accessibles
with pytest.raises(AttributeError):
assert newclass.commands
with pytest.raises(AttributeError):
assert newclass.parameters
logger = type('LoggerStub', (object,), dict(
debug = lambda self, *a: print(*a),
info = lambda self, *a: print(*a),
))()
dispatcher = type('DispatcherStub', (object,), dict(
announce_update = lambda self, m, pn, pv: print('%s:%s=%r' % (m.name, pn, pv)),
))()
o1 = newclass(logger, {}, 'o1', dispatcher)
o2 = newclass(logger, {}, 'o1', dispatcher)
params_found= set()
ctr_found = set()
for obj in [o1, o2]:
for n, o in obj.accessibles.items():
print(n)
assert o not in params_found
params_found.add(o)
assert o.ctr not in ctr_found
ctr_found.add(o.ctr)
o1.init()
o2.init()

61
test/test_params.py Normal file
View File

@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
#
# *****************************************************************************
"""test data types."""
import sys
sys.path.insert(0, sys.path[0] + '/..')
# no fixtures needed
import pytest
from secop.datatypes import BoolType
from secop.params import Command, Parameter, Override
def test_Command():
cmd = Command('do_something', [], None)
assert cmd.description
assert cmd.ctr
assert cmd.arguments == []
assert cmd.result is None
def test_Parameter():
p1 = Parameter('description1', datatype=BoolType, default=False)
p2 = Parameter('description2', datatype=BoolType, default=True)
assert p1 != p2
assert p1.ctr != p2.ctr
with pytest.raises(ValueError):
Parameter(None, datatype=float)
def test_Override():
p = Parameter('description1', datatype=BoolType, default=False)
o = Override(default=True)
assert o.ctr != p.ctr
q = o.apply(p)
assert q.ctr == o.ctr # override shall be useable to influence the order, hence copy the ctr value
assert q.ctr != p.ctr
assert o.ctr != p.ctr
assert q != p