fetched mlz version

- before some chamges in the gerrit pipline

Change-Id: I33eb2d75f83345a7039d0fb709e66defefb1c3e0
This commit is contained in:
2023-05-02 11:31:30 +02:00
parent b19a8c2e5c
commit da15df076a
765 changed files with 35890 additions and 59302 deletions

View File

@ -15,7 +15,7 @@ def constants():
del c
@pytest.fixture(scope="session")
# pylint: disable=redefined-builtin
@pytest.fixture(scope="session")
def globals():
return dict()
return {}

View File

@ -20,8 +20,8 @@
#
# *****************************************************************************
from secop.modules import Module, Attached
from secop.protocol.dispatcher import Dispatcher
from frappy.modules import Module, Attached
from frappy.protocol.dispatcher import Dispatcher
# class DispatcherStub:

View File

@ -25,12 +25,12 @@
# no fixtures needed
import pytest
from secop.datatypes import ArrayOf, BLOBType, BoolType, \
CommandType, ConfigError, DataType, Enum, EnumType, FloatRange, \
IntRange, ProgrammingError, ScaledInteger, StatusType, \
StringType, StructOf, TextType, TupleOf, get_datatype, \
DiscouragedConversion
from secop.lib import generalConfig
from frappy.datatypes import ArrayOf, BLOBType, BoolType, CommandType, \
ConfigError, DataType, EnumType, FloatRange, \
IntRange, ProgrammingError, ScaledInteger, StatusType, StringType, \
StructOf, TextType, TupleOf, ValueType, get_datatype
from frappy.errors import BadValueError, RangeError, WrongTypeError
from frappy.lib import generalConfig
def copytest(dt):
@ -54,13 +54,15 @@ def test_FloatRange():
copytest(dt)
assert dt.export_datatype() == {'type': 'double', 'min':-3.14, 'max':3.14}
with pytest.raises(ValueError):
dt(9)
with pytest.raises(ValueError):
dt(-9)
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt.validate(9)
with pytest.raises(RangeError):
dt.validate(-9)
dt(9) # convert, but do not check limits
dt(-9) # convert, but do not check limits
with pytest.raises(WrongTypeError):
dt('XX')
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt([19, 'X'])
dt(1)
dt(0)
@ -105,17 +107,19 @@ def test_IntRange():
copytest(dt)
assert dt.export_datatype() == {'type': 'int', 'min':-3, 'max':3}
with pytest.raises(ValueError):
dt(9)
with pytest.raises(ValueError):
dt(-9)
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt.validate(9)
with pytest.raises(RangeError):
dt.validate(-9)
dt(9) # convert, but do not check limits
dt(-9) # convert, but do not check limits
with pytest.raises(WrongTypeError):
dt('XX')
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt([19, 'X'])
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt(1.3)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt('1.3')
dt(1)
dt(0)
@ -141,17 +145,19 @@ def test_ScaledInteger():
# serialisation of datatype contains limits on the 'integer' value
assert dt.export_datatype() == {'type': 'scaled', 'scale':0.01, 'min':-300, 'max':300}
with pytest.raises(ValueError):
dt(9)
with pytest.raises(ValueError):
dt(-9)
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt.validate(9)
with pytest.raises(RangeError):
dt.validate(-9)
dt(9) # convert, but do not check limits
dt(-9) # convert, but do not check limits
with pytest.raises(WrongTypeError):
dt('XX')
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt([19, 'X'])
dt(1)
dt(0)
with pytest.raises(ValueError):
with pytest.raises(ProgrammingError):
ScaledInteger('xc', 'Yx')
with pytest.raises(ProgrammingError):
ScaledInteger(scale=0, minval=1, maxval=2)
@ -171,27 +177,30 @@ def test_ScaledInteger():
'unit':'A'}
assert dt.absolute_resolution == dt.scale
dt = ScaledInteger(0.003, 0, 1, unit='X', fmtstr='%.1f',
dt = ScaledInteger(0.003, 0.4, 1, unit='X', fmtstr='%.1f',
absolute_resolution=0.001, relative_resolution=1e-5)
copytest(dt)
assert dt.export_datatype() == {'type': 'scaled', 'scale':0.003, 'min':0, 'max':333,
assert dt.export_datatype() == {'type': 'scaled', 'scale':0.003, 'min':133, 'max':333,
'unit':'X', 'fmtstr':'%.1f',
'absolute_resolution':0.001,
'relative_resolution':1e-5}
assert dt(0.4) == 0.399
assert dt.format_value(0.4) == '0.4 X'
assert dt.format_value(0.4, '') == '0.4'
assert dt.format_value(0.4, 'Z') == '0.4 Z'
assert dt(1.0029) == 0.999
with pytest.raises(ValueError):
dt(1.004)
assert round(dt(0.7), 5) == 0.699
assert dt.format_value(0.6) == '0.6 X'
assert dt.format_value(0.6, '') == '0.6'
assert dt.format_value(0.6, 'Z') == '0.6 Z'
assert round(dt.validate(1.0004), 5) == 0.999 # rounded value within limit
with pytest.raises(RangeError):
dt.validate(1.006) # rounded value outside limit
assert round(dt.validate(0.398), 5) == 0.399 # rounded value within rounded limit
with pytest.raises(RangeError):
dt.validate(0.395) # rounded value outside limit
dt.setProperty('min', 1)
dt.setProperty('max', 0)
with pytest.raises(ConfigError):
dt.checkProperties()
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt.setProperty('scale', None)
@ -205,21 +214,21 @@ def test_EnumType():
dt = EnumType('dt', a=3, c=7, stuff=1)
copytest(dt)
assert dt.export_datatype() == {'type': 'enum', 'members': dict(a=3, c=7, stuff=1)}
assert dt.export_datatype() == {'type': 'enum', 'members': {'a': 3, 'c': 7, 'stuff': 1}}
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt(9)
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt(-9)
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt('XX')
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt([19, 'X'])
assert dt('a') == 3
assert dt('stuff') == 1
assert dt(1) == 1
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt(2)
assert dt.export_value('c') == 7
@ -228,9 +237,9 @@ def test_EnumType():
assert dt.import_value('c') == 7
assert dt.import_value('a') == 3
assert dt.import_value('stuff') == 1
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt.export_value(2)
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt.import_value('A')
assert dt.format_value(3) == 'a<3>'
@ -249,13 +258,13 @@ def test_BLOBType():
copytest(dt)
assert dt.export_datatype() == {'type': 'blob', 'minbytes':3, 'maxbytes':10}
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt(9)
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt(b'av')
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt(b'abcdefghijklmno')
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt('abcd')
assert dt(b'abcd') == b'abcd'
@ -287,13 +296,13 @@ def test_StringType():
copytest(dt)
assert dt.export_datatype() == {'type': 'string', 'minchars':4, 'maxchars':11}
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt(9)
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt('av')
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt('abcdefghijklmno')
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt('abcdefg\0')
assert dt('abcd') == 'abcd'
# tests with bytes have to be added after migration to py3
@ -317,11 +326,11 @@ def test_TextType():
copytest(dt)
assert dt.export_datatype() == {'type': 'string', 'maxchars':12}
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt(9)
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt('abcdefghijklmno')
with pytest.raises(ValueError):
with pytest.raises(RangeError):
dt('abcdefg\0')
assert dt('ab\n\ncd\n') == 'ab\n\ncd\n'
# assert dt(b'ab\n\ncd\n') == 'ab\n\ncd\n'
@ -338,9 +347,9 @@ def test_BoolType():
copytest(dt)
assert dt.export_datatype() == {'type': 'bool'}
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt(9)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt('av')
assert dt('true') is True
@ -353,7 +362,7 @@ def test_BoolType():
assert dt.import_value(False) is False
assert dt.import_value(True) is True
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt.import_value('av')
assert dt.format_value(0) == "False"
@ -366,9 +375,9 @@ def test_BoolType():
def test_ArrayOf():
# test constructor catching illegal arguments
with pytest.raises(ValueError):
with pytest.raises(ProgrammingError):
ArrayOf(int)
with pytest.raises(ValueError):
with pytest.raises(ProgrammingError):
ArrayOf(-3, IntRange(-10,10))
dt = ArrayOf(IntRange(-10, 10), 5)
copytest(dt)
@ -381,9 +390,9 @@ def test_ArrayOf():
assert dt.export_datatype() == {'type': 'array', 'minlen':1, 'maxlen':3,
'members':{'type': 'double', 'min':-10,
'max':10, 'unit': 'Z'}}
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt(9)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt('av')
assert dt([1, 2, 3]) == (1, 2, 3)
@ -410,19 +419,22 @@ def test_ArrayOf():
dt = ArrayOf(EnumType('myenum', single=0), 5)
copytest(dt)
dt = ArrayOf(ArrayOf(FloatRange(unit='m')))
assert dt.format_value([[0, 1], [2, 3]]) == '[[0, 1], [2, 3]] m'
def test_TupleOf():
# test constructor catching illegal arguments
with pytest.raises(ValueError):
with pytest.raises(ProgrammingError):
TupleOf(2)
dt = TupleOf(IntRange(-10, 10), BoolType())
copytest(dt)
assert dt.export_datatype() == {'type': 'tuple',
'members':[{'type': 'int', 'min':-10, 'max':10}, {'type': 'bool'}]}
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt(9)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt([99, 'X'])
assert dt([1, True]) == (1, True)
@ -438,7 +450,7 @@ def test_TupleOf():
def test_StructOf():
# test constructor catching illegal arguments
with pytest.raises(ValueError):
with pytest.raises(ProgrammingError):
StructOf(IntRange) # pylint: disable=E1121
with pytest.raises(ProgrammingError):
StructOf(IntRange=1)
@ -451,21 +463,21 @@ def test_StructOf():
'an_int': {'type': 'int', 'min':0, 'max':999}},
'optional':['an_int']}
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt(9)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
dt([99, 'X'])
with pytest.raises(ValueError):
dt(dict(a_string='XXX', an_int=1811))
with pytest.raises(RangeError):
dt.validate({'a_string': 'XXX', 'an_int': 1811})
assert dt(dict(a_string='XXX', an_int=8)) == {'a_string': 'XXX',
'an_int': 8}
assert dt({'a_string': 'XXX', 'an_int': 8}) == {'a_string': 'XXX',
'an_int': 8}
assert dt.export_value({'an_int': 13, 'a_string': 'WFEC'}) == {
'a_string': 'WFEC', 'an_int': 13}
assert dt.import_value({'an_int': 13, 'a_string': 'WFEC'}) == {
'a_string': 'WFEC', 'an_int': 13}
assert dt.format_value({'an_int':2, 'a_string':'Z'}) == "{a_string='Z', an_int=2}"
assert dt.format_value({'an_int': 2, 'a_string': 'Z'}) == "{an_int=2, a_string='Z'}"
dt = StructOf(['optionalmember'], optionalmember=EnumType('myenum', single=0))
copytest(dt)
@ -485,38 +497,50 @@ def test_Command():
def test_StatusType():
status_codes = Enum('Status', IDLE=100, WARN=200, BUSY=300, ERROR=400)
dt = StatusType(status_codes)
assert dt.IDLE == status_codes.IDLE
assert dt.ERROR == status_codes.ERROR
assert dt._enum == status_codes
dt = StatusType('IDLE', 'WARN', 'ERROR', 'DISABLED')
assert dt.IDLE == StatusType.IDLE == 100
assert dt.ERROR == StatusType.ERROR == 400
dt2 = StatusType(None, IDLE=100, WARN=200, ERROR=400, DISABLED=0)
assert dt2.export_datatype() == dt.export_datatype()
dt3 = StatusType(dt.enum)
assert dt3.export_datatype() == dt.export_datatype()
with pytest.raises(ProgrammingError):
StatusType('__init__') # built in attribute of StatusType
with pytest.raises(ProgrammingError):
StatusType(dt.enum, 'custom') # not a standard attribute
StatusType(dt.enum, custom=499) # o.k., if value is given
def test_get_datatype():
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(1)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(True)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(str)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'undefined': {}})
assert isinstance(get_datatype({'type': 'bool'}), BoolType)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['bool'])
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'int', 'min':-10}) # missing max
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'int', 'max':10}) # missing min
assert isinstance(get_datatype({'type': 'int', 'min':-10, 'max':10}), IntRange)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'int', 'min':10, 'max':-10}) # min > max
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'int'}) # missing limits
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'int', 'x': 2})
assert isinstance(get_datatype({'type': 'double'}), FloatRange)
@ -525,16 +549,16 @@ def test_get_datatype():
assert isinstance(get_datatype({'type': 'double', 'min':-9.9, 'max':11.1}),
FloatRange)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['double'])
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'double', 'min':10, 'max':-10})
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['double', {}, 2])
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'scaled', 'scale':0.01, 'min':-2.718})
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'scaled', 'scale':0.02, 'max':3.14})
assert isinstance(get_datatype(
{'type': 'scaled', 'scale':0.03, 'min':-99, 'max':111}), ScaledInteger)
@ -543,45 +567,45 @@ def test_get_datatype():
assert dt.export_datatype() == {'type': 'scaled', 'max':330, 'min':0, 'scale':0.03}
assert get_datatype(dt.export_datatype()).export_datatype() == dt.export_datatype()
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['scaled']) # dict missing
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'scaled', 'min':-10, 'max':10}) # no scale
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'scaled', 'min':10, 'max':-10, 'scale': 1}) # limits reversed
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['scaled', {'min':10, 'max':-10, 'scale': 1}, 2])
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['enum'])
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'enum', 'a': -2})
assert isinstance(get_datatype({'type': 'enum', 'members':dict(a=-2)}), EnumType)
assert isinstance(get_datatype({'type': 'enum', 'members': {'a': -2}}), EnumType)
assert isinstance(get_datatype({'type': 'blob', 'maxbytes':1}), BLOBType)
assert isinstance(get_datatype({'type': 'blob', 'minbytes':1, 'maxbytes':10}), BLOBType)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'blob', 'minbytes':10, 'maxbytes':1})
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'blob', 'minbytes':10, 'maxbytes':-10})
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['blob', {'maxbytes':10}, 'x'])
assert isinstance(get_datatype({'type': 'string', 'maxchars':1}), StringType)
assert isinstance(get_datatype({'type': 'string', 'maxchars':1}), StringType)
assert isinstance(get_datatype({'type': 'string', 'minchars':1, 'maxchars':10}), StringType)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'string', 'minchars':10, 'maxchars':1})
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'string', 'minchars':10, 'maxchars':-10})
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['string', {'maxchars':-0}, 'x'])
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['array'])
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'array', 'members': [1]})
assert isinstance(get_datatype({'type': 'array', 'minlen':1, 'maxlen':1,
'members':{'type': 'blob', 'maxbytes':1}}
@ -590,43 +614,43 @@ def test_get_datatype():
'members':{'type': 'blob', 'maxbytes':1}}
).members, BLOBType)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'array', 'members':{'type': 'blob', 'maxbytes':1}, 'minbytes':-10})
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'array', 'members':{'type': 'blob', 'maxbytes':1},
'min':10, 'max':1})
with pytest.raises(ValueError):
get_datatype({'type': 'array', 'blob': dict(max=4), 'maxbytes': 10})
with pytest.raises(WrongTypeError):
get_datatype({'type': 'array', 'blob': {'max': 4}, 'maxbytes': 10})
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['tuple'])
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['tuple', [1], 2, 3])
assert isinstance(get_datatype(
{'type': 'tuple', 'members':[{'type': 'blob', 'maxbytes':1}]}), TupleOf)
assert isinstance(get_datatype(
{'type': 'tuple', 'members':[{'type': 'blob', 'maxbytes':1}]}).members[0], BLOBType)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'tuple', 'members': {}})
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['tuple', 10, -10])
assert isinstance(get_datatype({'type': 'tuple', 'members':[{'type': 'blob', 'maxbytes':1},
{'type': 'bool'}]}), TupleOf)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['struct'])
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype(['struct', [1], 2, 3])
assert isinstance(get_datatype({'type': 'struct', 'members':
{'name': {'type': 'blob', 'maxbytes':1}}}), StructOf)
assert isinstance(get_datatype({'type': 'struct', 'members':
{'name': {'type': 'blob', 'maxbytes':1}}}).members['name'], BLOBType)
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'struct', 'members': {}})
with pytest.raises(ValueError):
with pytest.raises(WrongTypeError):
get_datatype({'type': 'struct', 'members':[1,2,3]})
@ -638,11 +662,11 @@ def test_get_datatype():
(StringType(10, 10), StringType()),
(ArrayOf(StringType(), 3, 5), ArrayOf(StringType(), 3, 6)),
(TupleOf(StringType(), BoolType()), TupleOf(StringType(), IntRange())),
(StructOf(a=FloatRange(-1,1)), StructOf(a=FloatRange(), b=BoolType(), optional=['b'])),
(StructOf(a=FloatRange(-1,1), b=BoolType()), StructOf(a=FloatRange(), b=BoolType(), optional=['b'])),
])
def test_oneway_compatible(dt, contained_in):
dt.compatible(contained_in)
with pytest.raises(ValueError):
with pytest.raises(BadValueError):
contained_in.compatible(dt)
@ -667,9 +691,9 @@ def test_twoway_compatible(dt1, dt2):
(StructOf(a=FloatRange(-1, 1), b=StringType()), StructOf(a=FloatRange(), b=BoolType())),
])
def test_incompatible(dt1, dt2):
with pytest.raises(ValueError):
with pytest.raises(BadValueError):
dt1.compatible(dt2)
with pytest.raises(ValueError):
with pytest.raises(BadValueError):
dt2.compatible(dt1)
@ -678,5 +702,58 @@ def test_lazy_validation(dt):
generalConfig.defaults['lazy_number_validation'] = True
dt('0')
generalConfig.defaults['lazy_number_validation'] = False
with pytest.raises(DiscouragedConversion):
with pytest.raises(WrongTypeError):
dt('0')
mytuple = TupleOf(ScaledInteger(0.1, 0, 10, unit='$'), FloatRange(unit='$/min'))
myarray = ArrayOf(mytuple)
@pytest.mark.parametrize('unit, dt', [
('m', FloatRange(unit='$/sec')),
('A', mytuple),
('V', myarray),
('X', StructOf(a=myarray, b=mytuple)),
])
def test_main_unit(unit, dt):
fixed_dt = dt.copy()
fixed_dt.set_main_unit(unit)
before = repr(dt.export_datatype())
after = repr(fixed_dt.export_datatype())
assert '$' in before
assert before != after
assert before.replace('$', unit) == after
def ex_validator(i):
if i > 10:
raise RuntimeError('too large')
return i
@pytest.mark.parametrize('validator, value, result', [
(dict, [('a', 1)], {'a': 1}),
(ex_validator, 5, 5),
# pylint: disable=unnecessary-lambda
(lambda x: dict(x), {'a': 1}, {'a': 1}),
# pylint: disable=unnecessary-lambda
(lambda i: ex_validator(i) * 3, 3, 9),
])
def test_value_type(validator, value, result):
t = ValueType()
tv = ValueType(validator)
assert t(value) == value
assert tv(value) == result
@pytest.mark.parametrize('validator, value', [
(dict, 'strinput'),
(ex_validator, 20),
# pylint: disable=unnecessary-lambda
(lambda i: list(i), 1),
])
def test_value_type_rejecting(validator, value):
t = ValueType()
tv = ValueType(validator)
assert t(value) == value
with pytest.raises(ConfigError):
tv(value)

View File

@ -21,10 +21,28 @@
# *****************************************************************************
"""test data types."""
import secop.errors
import pytest
from frappy.errors import RangeError, WrongTypeError, ProgrammingError, \
ConfigError, InternalError, TimeoutSECoPError, secop_error, make_secop_error
def test_errors():
"""check consistence of secop.errors.EXCEPTIONS"""
for e in secop.errors.EXCEPTIONS.values():
assert secop.errors.EXCEPTIONS[e().name] == e
@pytest.mark.parametrize('exc, name, text, echk', [
(RangeError('out of range'), 'RangeError', 'out of range', None),
(WrongTypeError('bad type'), 'WrongType', 'bad type', None),
(ProgrammingError('x'), 'InternalError', 'ProgrammingError: x', None),
(ConfigError('y'), 'InternalError', 'ConfigError: y', None),
(InternalError('z'), 'InternalError', 'z', None),
(TimeoutSECoPError('t'), 'TimeoutError', 't', None),
(ValueError('v'), 'InternalError', "ValueError: v", InternalError("ValueError: v")),
(None, 'InternalError', "UnknownError: v", InternalError("UnknownError: v")),
])
def test_errors(exc, name, text, echk):
"""check consistence of frappy.errors"""
if exc:
err = secop_error(exc)
assert err.name == name
assert str(err) == text
recheck = make_secop_error(name, text)
echk = echk or exc
assert type(recheck) == type(echk)
assert str(recheck) == str(echk)

View File

@ -21,9 +21,10 @@
# *****************************************************************************
from secop.rwhandler import ReadHandler, WriteHandler, \
from frappy.rwhandler import ReadHandler, WriteHandler, \
CommonReadHandler, CommonWriteHandler, nopoll
from secop.core import Module, Parameter, FloatRange, Done
from frappy.core import Module, Parameter, FloatRange
from frappy.lib import generalConfig
class DispatcherStub:
@ -31,9 +32,9 @@ class DispatcherStub:
# initial value from the timestamp. However, in the test below
# the second update happens after the updates dict is cleared
# -> we have to inhibit the 'omit unchanged update' feature
omit_unchanged_within = 0
def __init__(self, updates):
generalConfig.testinit(omit_unchanged_within=0)
self.updates = updates
def announce_update(self, modulename, pname, pobj):
@ -62,6 +63,7 @@ class ServerStub:
class ModuleTest(Module):
def __init__(self, updates=None, **opts):
opts['description'] = ''
opts = {p: {'value': val} for p, val in opts.items()}
super().__init__('mod', logger, opts, ServerStub(updates or {}))
@ -83,10 +85,9 @@ def test_handler():
data.append(pname)
return value
assert Mod.read_a.poll is True
assert Mod.read_b.poll is True
m = Mod()
assert m.read_a.poll is True
assert m.read_b.poll is True
data.append(1.2)
assert m.read_a() == 1.2
@ -104,11 +105,11 @@ def test_handler():
assert m.b == 7
assert data.pop() == 'b'
data.append(Done)
data.append(m.b)
assert m.read_b() == 7
assert data.pop() == 'b'
assert data == []
assert not data
def test_common_handler():
@ -129,15 +130,16 @@ def test_common_handler():
self.b = values['b']
data.append('write_hdl')
assert set([Mod.read_a.poll, Mod.read_b.poll]) == {True, False}
m = Mod(a=1, b=2)
assert set([m.read_a.poll, m.read_b.poll]) == {True, False}
assert m.writeDict == {'a': 1, 'b': 2}
m.write_a(3)
assert m.a == 3
assert m.b == 2
assert data.pop() == 'write_hdl'
assert m.writeDict == {}
assert not m.writeDict
m.write_b(4)
assert m.a == 3
@ -159,7 +161,7 @@ def test_common_handler():
assert m.b == 2.2
assert data.pop() == 'read_hdl'
assert data == []
assert not data
def test_nopoll():
@ -171,8 +173,10 @@ def test_nopoll():
def read_hdl(self):
pass
assert Mod1.read_a.poll is True
assert Mod1.read_b.poll is True
m = Mod1()
print(m, m.read_a)
assert m.read_a.poll is True
assert m.read_b.poll is True
class Mod2(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
@ -182,8 +186,9 @@ def test_nopoll():
def read_hdl(self):
pass
assert Mod2.read_a.poll is True
assert Mod2.read_b.poll is False
m = Mod2()
assert m.read_a.poll is True
assert m.read_b.poll is False
class Mod3(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
@ -194,8 +199,9 @@ def test_nopoll():
def read_hdl(self):
pass
assert Mod3.read_a.poll is False
assert Mod3.read_b.poll is False
m = Mod3()
assert m.read_a.poll is False
assert m.read_b.poll is False
class Mod4(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
@ -206,8 +212,9 @@ def test_nopoll():
def read_hdl(self):
pass
assert Mod4.read_a.poll is False
assert Mod4.read_b.poll is False
m = Mod4()
assert m.read_a.poll is False
assert m.read_b.poll is False
class Mod5(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
@ -218,8 +225,9 @@ def test_nopoll():
def read_hdl(self):
pass
assert Mod5.read_a.poll is False
assert Mod5.read_b.poll is False
m = Mod5()
assert m.read_a.poll is False
assert m.read_b.poll is False
class Mod6(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
@ -230,5 +238,6 @@ def test_nopoll():
def read_hdl(self):
pass
assert Mod6.read_a.poll is False
assert Mod6.read_b.poll is False
m = Mod6()
assert m.read_a.poll is False
assert m.read_b.poll is False

View File

@ -1,202 +0,0 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""test commandhandler."""
import pytest
from secop.datatypes import FloatRange, IntRange, Property, StringType
from secop.errors import ProgrammingError
from secop.iohandler import CmdParser, IOHandler
from secop.modules import Module, Parameter
@pytest.mark.parametrize('fmt, text, values, text2', [
('%d,%d', '2,3', [2,3], None),
('%c%s', '222', ['2', '22'], None), # %c does not need a spearator
('%s%%%d_%%', 'abc%12_%', ['abc', 12], None), # % as separator and within separator
('%s.+', 'string,without+period.+', ['string,without+period',], None), # special characters
('%d,%.3f,%x,%o', '1,1.2346,fF,17', [1, 1.2346, 255, 15], '1,1.235,ff,17'), # special formats
])
def test_CmdParser(fmt, text, values, text2):
parser = CmdParser(fmt)
print(parser.format(*values))
assert parser.parse(text) == values
if text2 is None:
text2 = text
assert parser.format(*values) == text2
def test_CmdParser_ex():
with pytest.raises(ValueError):
CmdParser('%d%s') # no separator
class Data:
"""a cyclic list where we put data to be checked or used during test"""
def __init__(self):
self.data = []
def push(self, tag, *args):
if len(args) == 1:
args = args[0]
self.data.append((tag, args))
def pop(self, expected):
tag, data = self.data.pop(0)
print('pop(%s) %r' % (tag, data))
if tag != expected:
raise ValueError('expected tag %s, not %s' % (expected, tag))
return data
def empty(self):
return not self.data
class DispatcherStub:
# the first update from the poller comes a very short time after the
# initial value from the timestamp. However, in the test below
# the second update happens after the updates dict is cleared
# -> we have to inhibit the 'omit unchanged update' feature
omit_unchanged_within = 0
def __init__(self, updates):
self.updates = updates
def announce_update(self, module, pname, pobj):
if pobj.readerror:
self.updates['error', pname] = str(pobj.readerror)
else:
self.updates[pname] = pobj.value
class LoggerStub:
def debug(self, *args):
pass
info = warning = exception = debug
class ServerStub:
def __init__(self, updates):
self.dispatcher = DispatcherStub(updates)
def test_IOHandler():
class Hdl(IOHandler):
CMDARGS = ['channel', 'loop']
CMDSEPARATOR ='|'
def __init__(self, name, querycmd, replyfmt):
changecmd = querycmd.replace('?', ' ')
if not querycmd.endswith('?'):
changecmd += ','
super().__init__(name, querycmd, replyfmt, changecmd)
group1 = Hdl('group1', 'SIMPLE?', '%g')
group2 = Hdl('group2', 'CMD?%(channel)d', '%g,%s,%d')
class Module1(Module):
channel = Property('the channel', IntRange(), default=3)
loop = Property('the loop', IntRange(), default=2)
simple = Parameter('a readonly', FloatRange(), default=0.77, handler=group1)
real = Parameter('a float value', FloatRange(), default=12.3, handler=group2, readonly=False)
text = Parameter('a string value', StringType(), default='x', handler=group2, readonly=False)
def communicate(self, command):
assert data.pop('command') == command
return data.pop('reply')
def analyze_group1(self, val):
assert data.pop('val') == val
return dict(simple=data.pop('simple'))
def analyze_group2(self, gval, sval, dval):
assert data.pop('gsv') == (gval, sval, dval)
real, text = data.pop('rt')
return dict(real=real, text=text)
def change_group2(self, change):
gval, sval, dval = change.readValues()
assert data.pop('old') == (gval, sval, dval)
assert data.pop('self') == (self.real, self.text)
assert data.pop('new') == (change.real, change.text)
return data.pop('changed')
data = Data()
updates = {}
module = Module1('mymodule', LoggerStub(), {'.description': ''}, ServerStub(updates))
print(updates)
updates.clear() # get rid of updates from initialisation
# for communicate
data.push('command', 'SIMPLE?')
data.push('reply', '4.51')
# for analyze_group1
data.push('val', 4.51)
data.push('simple', 45.1)
value = module.read_simple()
assert value == 45.1
assert module.simple == value
assert data.empty()
assert updates.pop('simple') == 45.1
assert not updates
# for communicate
data.push('command', 'CMD?3')
data.push('reply', '1.23,text,5')
# for analyze_group2
data.push('gsv', 1.23, 'text', 5)
data.push('rt', 12.3, 'string')
value = module.read_real()
assert module.real == value
assert module.real == updates.pop('real') == 12.3
assert module.text == updates.pop('text') == 'string'
assert data.empty()
assert not updates
# for communicate
data.push('command', 'CMD?3')
data.push('reply', '1.23,text,5')
# for analyze_group2
data.push('gsv', 1.23, 'text', 5)
data.push('rt', 12.3, 'string')
# for change_group2
data.push('old', 1.23, 'text', 5)
data.push('self', 12.3, 'string')
data.push('new', 12.3, 'FOO')
data.push('changed', 1.23, 'foo', 9)
# for communicate
data.push('command', 'CMD 3,1.23,foo,9|CMD?3')
data.push('reply', '1.23,foo,9')
# for analyze_group2
data.push('gsv', 1.23, 'foo', 9)
data.push('rt', 12.3, 'FOO')
value = module.write_text('FOO')
assert module.text == value
assert module.text == updates.pop('text') == 'FOO'
assert module.real == updates.pop('real') == 12.3
assert data.empty()
assert not updates
with pytest.raises(ProgrammingError): # can not use a handler for different modules
# pylint: disable=unused-variable
class Module2(Module):
simple = Parameter('a readonly', FloatRange(), default=0.77, handler=group1)

41
test/test_lib.py Normal file
View File

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import pytest
from frappy.lib import parseHostPort
@pytest.mark.parametrize('hostport, defaultport, result', [
(('box.psi.ch', 9999), 1, ('box.psi.ch', 9999)),
(('/dev/tty', 9999), 1, None),
('localhost:10767', 1, ('localhost', 10767)),
('www.psi.ch', 80, ('www.psi.ch', 80)),
('/dev/ttyx:2089', 10767, None),
('COM4:', 2089, None),
('underscore_valid.123.hyphen-valid.com', 80, ('underscore_valid.123.hyphen-valid.com', 80)),
])
def test_parse_host(hostport, defaultport, result):
if result is None:
with pytest.raises(ValueError):
parseHostPort(hostport, defaultport)
else:
assert result == parseHostPort(hostport, defaultport)

View File

@ -25,7 +25,7 @@
# no fixtures needed
import pytest
from secop.lib.enum import Enum, EnumMember
from frappy.lib.enum import Enum, EnumMember
def test_EnumMember():
@ -49,7 +49,7 @@ def test_EnumMember():
a += 2
# this shall work
assert 2 == (a + 1) # pylint: disable=C0122
assert 2 == (a + 1)
assert (a - 1) == 0
assert a
assert a + a
@ -83,3 +83,12 @@ def test_Enum_bool():
e = Enum('OffOn', off=0, on=1)
assert bool(e(0)) is False
assert bool(e(1)) is True
def test_Enum_duplicate():
e = Enum('x', a=1, b=2)
Enum(e, b=2, c=3) # matching duplicate
with pytest.raises(TypeError):
Enum(e, b=3, c=4) # duplicate name with value mismatch
with pytest.raises(TypeError):
Enum(e, c=1) # duplicate value with name mismatch

View File

@ -22,11 +22,11 @@
import pytest
import mlzlog
from secop.modules import Module
from secop.protocol.dispatcher import Dispatcher
from secop.protocol.interface import encode_msg_frame, decode_msg
import secop.logging
from secop.logging import logger, generalConfig, HasComlog
from frappy.modules import Module
from frappy.protocol.dispatcher import Dispatcher
from frappy.protocol.interface import encode_msg_frame, decode_msg
import frappy.logging
from frappy.logging import logger, generalConfig, HasComlog
class ServerStub:
@ -54,6 +54,7 @@ class Connection:
@pytest.fixture(name='init')
def init_(monkeypatch):
# pylint: disable=unnecessary-dunder-call
logger.__init__()
class Playground:
@ -88,8 +89,8 @@ def init_(monkeypatch):
close = flush = emit = noop
monkeypatch.setattr(mlzlog, 'ColoredConsoleHandler', ConsoleHandler)
monkeypatch.setattr(secop.logging, 'ComLogfileHandler', ComLogHandler)
monkeypatch.setattr(secop.logging, 'LogfileHandler', LogfileHandler)
monkeypatch.setattr(frappy.logging, 'ComLogfileHandler', ComLogHandler)
monkeypatch.setattr(frappy.logging, 'LogfileHandler', LogfileHandler)
class Mod(Module):
result = []

View File

@ -26,12 +26,12 @@ import sys
import threading
import pytest
from secop.datatypes import BoolType, FloatRange, StringType, IntRange, ScaledInteger
from secop.errors import ProgrammingError, ConfigError
from secop.modules import Communicator, Drivable, Readable, Module
from secop.params import Command, Parameter
from secop.rwhandler import ReadHandler, WriteHandler, nopoll
from secop.lib import generalConfig
from frappy.datatypes import BoolType, FloatRange, StringType, IntRange, ScaledInteger
from frappy.errors import ProgrammingError, ConfigError, RangeError
from frappy.modules import Communicator, Drivable, Readable, Module
from frappy.params import Command, Parameter
from frappy.rwhandler import ReadHandler, WriteHandler, nopoll
from frappy.lib import generalConfig
class DispatcherStub:
@ -39,9 +39,9 @@ class DispatcherStub:
# initial value from the timestamp. However, in the test below
# the second update happens after the updates dict is cleared
# -> we have to inhibit the 'omit unchanged update' feature
omit_unchanged_within = 0
def __init__(self, updates):
generalConfig.testinit(omit_unchanged_within=0)
self.updates = updates
def announce_update(self, modulename, pname, pobj):
@ -77,7 +77,7 @@ class DummyMultiEvent(threading.Event):
def test_Communicator():
o = Communicator('communicator', LoggerStub(), {'.description': ''}, ServerStub({}))
o = Communicator('communicator', LoggerStub(), {'description': ''}, ServerStub({}))
o.earlyInit()
o.initModule()
event = DummyMultiEvent()
@ -151,8 +151,8 @@ def test_ModuleMagic():
a1 = Parameter(datatype=FloatRange(unit='$/s'), readonly=False)
# remark: it might be a programming error to override the datatype
# and not overriding the read_* method. This is not checked!
b2 = Parameter('<b2>', datatype=StringType(), default='empty',
readonly=False, initwrite=True)
b2 = Parameter('<b2>', datatype=StringType(), value='empty',
readonly=False)
def write_a1(self, value):
self._a1_written = value
@ -177,8 +177,8 @@ def test_ModuleMagic():
objects = []
for newclass, sortcheck in [(Newclass1, sortcheck1), (Newclass2, sortcheck2)]:
o1 = newclass('o1', logger, {'.description':''}, srv)
o2 = newclass('o2', logger, {'.description':''}, srv)
o1 = newclass('o1', logger, {'description':''}, srv)
o2 = newclass('o2', logger, {'description':''}, srv)
for obj in [o1, o2]:
objects.append(obj)
for o in obj.accessibles.values():
@ -187,12 +187,14 @@ def test_ModuleMagic():
params_found.add(o)
assert list(obj.accessibles) == sortcheck
updates.clear()
# check for inital updates working properly
o1 = Newclass1('o1', logger, {'.description':''}, srv)
o1 = Newclass1('o1', logger, {'description':''}, srv)
expectedBeforeStart = {'target': '', 'status': (Drivable.Status.IDLE, ''),
'param1': False, 'param2': 1.0, 'a1': 0.0, 'a2': True, 'pollinterval': 5.0,
'param1': False, 'param2': 1.0, 'a1': False, 'a2': True, 'pollinterval': 5.0,
'value': 'first'}
assert updates.pop('o1') == expectedBeforeStart
for k, v in expectedBeforeStart.items():
assert getattr(o1, k) == v
o1.earlyInit()
event = DummyMultiEvent()
o1.initModule()
@ -205,11 +207,11 @@ def test_ModuleMagic():
assert updates.pop('o1') == expectedAfterStart
# check in addition if parameters are written
o2 = Newclass2('o2', logger, {'.description':'', 'a1': 2.7}, srv)
# no update for b2, as this has to be written
expectedBeforeStart['a1'] = 2.7
expectedBeforeStart['target'] = 0.0
assert updates.pop('o2') == expectedBeforeStart
assert not updates
o2 = Newclass2('o2', logger, {'description':'', 'a1': {'value': 2.7}}, srv)
expectedBeforeStart.update(a1=2.7, b2='empty', target=0, value=0)
for k, v in expectedBeforeStart.items():
assert getattr(o2, k) == v
o2.earlyInit()
event = DummyMultiEvent()
o2.initModule()
@ -224,10 +226,10 @@ def test_ModuleMagic():
assert not updates
o1 = Newclass1('o1', logger, {'.description':''}, srv)
o2 = Newclass2('o2', logger, {'.description':''}, srv)
o1 = Newclass1('o1', logger, {'description':''}, srv)
o2 = Newclass2('o2', logger, {'description':''}, srv)
assert o2.parameters['a1'].datatype.unit == 'deg/s'
o2 = Newclass2('o2', logger, {'.description':'', 'value.unit':'mm', 'param2.unit':'mm'}, srv)
o2 = Newclass2('o2', logger, {'description':'', 'value':{'unit':'mm'},'param2':{'unit':'mm'}}, srv)
# check datatype is not shared
assert o1.parameters['param2'].datatype.unit == 'Ohm'
assert o2.parameters['param2'].datatype.unit == 'mm'
@ -235,15 +237,15 @@ def test_ModuleMagic():
assert o2.parameters['a1'].datatype.unit == 'mm/s'
cfg = Newclass2.configurables
assert set(cfg.keys()) == {
'export', 'group', 'description', 'disable_value_range_check',
'export', 'group', 'description', 'features',
'meaning', 'visibility', 'implementation', 'interface_classes', 'target', 'stop',
'status', 'param1', 'param2', 'cmd', 'a2', 'pollinterval', 'slowinterval', 'b2',
'cmd2', 'value', 'a1'}
'cmd2', 'value', 'a1', 'omit_unchanged_within'}
assert set(cfg['value'].keys()) == {
'group', 'export', 'relative_resolution',
'visibility', 'unit', 'default', 'datatype', 'fmtstr',
'visibility', 'unit', 'default', 'value', 'datatype', 'fmtstr',
'absolute_resolution', 'max', 'min', 'readonly', 'constant',
'description', 'needscfg'}
'description', 'needscfg', 'update_unchanged'}
# check on the level of classes
# this checks Newclass1 too, as it is inherited by Newclass2
@ -374,13 +376,13 @@ def test_command_check():
with pytest.raises(ProgrammingError):
BadDatatype('o', logger, {
'description': '',
'cmd.argument': {'type': 'double', 'min': 1, 'max': 0},
'cmd': {'argument': {'type': 'double', 'min': 1, 'max': 0}},
}, srv)
with pytest.raises(ProgrammingError):
BadDatatype('o', logger, {
'description': '',
'cmd.visibility': 'invalid',
'cmd': {'visibility': 'invalid'},
}, srv)
@ -413,17 +415,15 @@ def test_mixin():
MixedDrivable('o', logger, {
'description': '',
'param1.description': 'param 1',
'param1': 0,
'param2.datatype': {"type": "double"},
'param1': {'value': 0, 'description': 'param1'},
'param2': {'datatype': {"type": "double"}},
}, srv)
with pytest.raises(ConfigError):
MixedReadable('o', logger, {
'description': '',
'param1.description': 'param 1',
'param1': 0,
'param2.datatype': {"type": "double"},
'param1': {'value': 0, 'description': 'param1'},
'param2': {'datatype': {"type": "double"}},
}, srv)
@ -434,7 +434,7 @@ def test_override():
def stop(self):
"""no decorator needed"""
assert Mod.value.default == 5
assert Mod.value.value == 5
assert Mod.stop.description == "no decorator needed"
class Mod2(Drivable):
@ -455,7 +455,7 @@ def test_command_config():
srv = ServerStub({})
mod = Mod('o', logger, {
'description': '',
'convert.argument': {'type': 'bool'},
'convert': {'argument': {'type': 'bool'}},
}, srv)
assert mod.commands['convert'].datatype.export_datatype() == {
'type': 'command',
@ -465,7 +465,7 @@ def test_command_config():
mod = Mod('o', logger, {
'description': '',
'convert.datatype': {'type': 'command', 'argument': {'type': 'bool'}, 'result': {'type': 'bool'}},
'convert': {'datatype': {'type': 'command', 'argument': {'type': 'bool'}, 'result': {'type': 'bool'}}},
}, srv)
assert mod.commands['convert'].datatype.export_datatype() == {
'type': 'command',
@ -529,7 +529,7 @@ def test_generic_access():
updates = {}
srv = ServerStub(updates)
obj = Mod('obj', logger, {'description': '', 'param': 'initial value'}, srv)
obj = Mod('obj', logger, {'description': '', 'param': {'value':'initial value'}}, srv)
assert obj.param == 'initial value'
assert obj.write_param('Cheese') == 'cheese'
assert obj.write_unhandled('Cheese') == 'Cheese'
@ -542,7 +542,7 @@ def test_generic_access():
assert obj.read_unhandled()
assert updates == {'obj': {'param': 'potato'}}
updates.clear()
assert updates == {}
assert not updates
def test_duplicate_handler_name():
@ -590,7 +590,7 @@ def test_no_read_write():
updates = {}
srv = ServerStub(updates)
obj = Mod('obj', logger, {'description': '', 'param': 'cheese'}, srv)
obj = Mod('obj', logger, {'description': '', 'param': {'value': 'cheese'}}, srv)
assert obj.param == 'cheese'
assert obj.read_param() == 'cheese'
assert updates == {'obj': {'param': 'cheese'}}
@ -630,32 +630,275 @@ def test_problematic_value_range():
srv = ServerStub({})
obj = Mod('obj', logger, {'description': '', 'value.max': 10.1}, srv) # pylint: disable=unused-variable
obj = Mod('obj', logger, {'description': '', 'value':{'max': 10.1}}, srv) # pylint: disable=unused-variable
with pytest.raises(ConfigError):
obj = Mod('obj', logger, {'description': ''}, srv)
obj = Mod('obj', logger, {'description': '', 'value.max': 9.9}, srv)
class Mod2(Drivable):
value = Parameter('', FloatRange(), default=0)
target = Parameter('', FloatRange(), default=0)
obj = Mod2('obj', logger, {'description': ''}, srv)
obj = Mod2('obj', logger, {'description': '', 'target.min': 0, 'target.max': 10}, srv)
obj = Mod2('obj', logger, {'description': '', 'target':{'min': 0, 'max': 10}}, srv)
with pytest.raises(ConfigError):
obj = Mod('obj', logger, {
'value.min': 0, 'value.max': 10,
'target.min': 0, 'target.max': 10, 'description': ''}, srv)
obj = Mod('obj', logger, {'disable_value_range_check': True,
'value.min': 0, 'value.max': 10,
'target.min': 0, 'target.max': 10, 'description': ''}, srv)
generalConfig.defaults['disable_value_range_check'] = True
obj = Mod('obj', logger, {
'value': {'min': 0, 'max': 10},
'target': {'min': 0, 'max': 10}, 'description': ''}, srv)
class Mod4(Drivable):
value = Parameter('', FloatRange(0, 10), default=0)
target = Parameter('', FloatRange(0, 10), default=0)
obj = Mod4('obj', logger, {
'value.min': 0, 'value.max': 10,
'target.min': 0, 'target.max': 10, 'description': ''}, srv)
'value': {'min': 0, 'max': 10},
'target': {'min': 0, 'max': 10}, 'description': ''}, srv)
@pytest.mark.parametrize('config, dynamicunit, finalunit, someunit', [
({}, 'K', 'K', 'K'),
({'value':{'unit': 'K'}}, 'C', 'C', 'C'),
({'value':{'unit': 'K'}}, '', 'K', 'K'),
({'value':{'unit': 'K'}, 'someparam':{'unit': 'A'}}, 'C', 'C', 'A'),
])
def test_deferred_main_unit(config, dynamicunit, finalunit, someunit):
# this pattern is used in frappy_mlz.entangle.AnalogInput
class Mod(Drivable):
ramp = Parameter('', datatype=FloatRange(unit='$/min'))
someparam = Parameter('', datatype=FloatRange(unit='$'))
__main_unit = None
def applyMainUnit(self, mainunit):
# called from __init__ method
# replacement of '$' by main unit must be done later
self.__main_unit = mainunit
def startModule(self, start_events):
super().startModule(start_events)
if dynamicunit:
self.accessibles['value'].datatype.setProperty('unit', dynamicunit)
self.__main_unit = dynamicunit
if self.__main_unit:
super().applyMainUnit(self.__main_unit)
srv = ServerStub({})
m = Mod('m', logger, {'description': '', **config}, srv)
m.startModule(None)
assert m.parameters['value'].datatype.unit == finalunit
assert m.parameters['target'].datatype.unit == finalunit
assert m.parameters['ramp'].datatype.unit == finalunit + '/min'
# when someparam.unit is configured, this differs from finalunit
assert m.parameters['someparam'].datatype.unit == someunit
def test_super_call():
class Base(Readable):
def read_status(self):
return Readable.Status.IDLE, 'base'
class Mod(Base):
def read_status(self):
code, text = super().read_status()
return code, text + ' (extended)'
class DispatcherStub1:
def __init__(self, updates):
self.updates = updates
def announce_update(self, modulename, pname, pobj):
if pobj.readerror:
raise pobj.readerror
self.updates.append((modulename, pname, pobj.value))
class ServerStub1:
def __init__(self, updates):
self.dispatcher = DispatcherStub1(updates)
updates = []
srv = ServerStub1(updates)
b = Base('b', logger, {'description': ''}, srv)
b.read_status()
assert updates == [('b', 'status', ('IDLE', 'base'))]
updates.clear()
m = Mod('m', logger, {'description': ''}, srv)
m.read_status()
# in the version before change 'allow super calls on read_/write_ methods'
# updates would contain two items
assert updates == [('m', 'status', ('IDLE', 'base (extended)'))]
assert type(m).__name__ == '_Mod'
assert type(m).__mro__[1:5] == (Mod, Base, Readable, Module)
def test_write_method_returns_none():
class Mod(Module):
a = Parameter('', FloatRange(), readonly=False)
def write_a(self, value):
return None
mod = Mod('mod', LoggerStub(), {'description': ''}, ServerStub({}))
mod.write_a(1.5)
assert mod.a == 1.5
@pytest.mark.parametrize('arg, value', [
('always', 0),
(0, 0),
('never', 999999999),
(999999999, 999999999),
('default', 0.25),
(1, 1),
])
def test_update_unchanged_ok(arg, value):
srv = ServerStub({})
generalConfig.testinit(omit_unchanged_within=0.25) # override value from DispatcherStub
class Mod1(Module):
a = Parameter('', FloatRange(), default=0, update_unchanged=arg)
mod1 = Mod1('mod1', LoggerStub(), {'description': ''}, srv)
par = mod1.parameters['a']
assert par.omit_unchanged_within == value
assert Mod1.a.omit_unchanged_within == 0
class Mod2(Module):
a = Parameter('', FloatRange(), default=0)
mod2 = Mod2('mod2', LoggerStub(), {'description': '', 'a': {'update_unchanged': arg}}, srv)
par = mod2.parameters['a']
assert par.omit_unchanged_within == value
assert Mod2.a.omit_unchanged_within == 0
def test_omit_unchanged_within():
srv = ServerStub({})
generalConfig.testinit(omit_unchanged_within=0.25) # override call from DispatcherStub
class Mod(Module):
a = Parameter('', FloatRange())
mod1 = Mod('mod1', LoggerStub(), {'description': ''}, srv)
assert mod1.parameters['a'].omit_unchanged_within == 0.25
mod2 = Mod('mod2', LoggerStub(), {'description': '', 'omit_unchanged_within': 0.125}, srv)
assert mod2.parameters['a'].omit_unchanged_within == 0.125
stdlim = {
'a_min': -1, 'a_max': 2,
'b_min': 0,
'c_max': 10,
'd_limits': (-1, 1),
}
class Lim(Module):
a = Parameter('', FloatRange(-10, 10), readonly=False, default=0)
a_min = Parameter()
a_max = Parameter()
b = Parameter('', FloatRange(0, None), readonly=False, default=0)
b_min = Parameter()
c = Parameter('', IntRange(None, 100), readonly=False, default=0)
c_max = Parameter()
d = Parameter('', FloatRange(-5, 5), readonly=False, default=0)
d_limits = Parameter()
e = Parameter('', IntRange(0, 8), readonly=False, default=0)
def check_e(self, value):
if value % 2:
raise RangeError('e must not be odd')
def test_limit_defaults():
srv = ServerStub({})
mod = Lim('mod', LoggerStub(), {'description': 'test'}, srv)
assert mod.a_min == -10
assert mod.a_max == 10
assert isinstance(mod.a_min, float)
assert isinstance(mod.a_max, float)
assert mod.b_min == 0
assert isinstance(mod.b_min, float)
assert mod.c_max == 100
assert isinstance(mod.c_max, int)
assert mod.d_limits == (-5, 5)
assert isinstance(mod.d_limits[0], float)
assert isinstance(mod.d_limits[1], float)
@pytest.mark.parametrize('limits, pname, good, bad', [
(stdlim, 'a', [-1, 2, 0], [-2, 3]),
(stdlim, 'b', [0, 1e99], [-1, -1e99]),
(stdlim, 'c', [-999, 0, 10], [11, 999]),
(stdlim, 'd', [-1, 0.1, 1], [-1.001, 1.001]),
({'a_min': 0, 'a_max': -1}, 'a', [], [0, -1]),
(stdlim, 'e', [0, 2, 4, 6, 8], [-1, 1, 7, 9]),
])
def test_limits(limits, pname, good, bad):
srv = ServerStub({})
mod = Lim('mod', LoggerStub(), {'description': 'test'}, srv)
mod.check_a = 0 # this should not harm. check_a is never called on the instance
for k, v in limits.items():
setattr(mod, k, v)
for v in good:
getattr(mod, 'write_' + pname)(v)
for v in bad:
with pytest.raises(RangeError):
getattr(mod, 'write_' + pname)(v)
def test_limit_inheritance():
srv = ServerStub({})
class Base(Module):
a = Parameter('', FloatRange(), readonly=False, default=0)
def check_a(self, value):
if int(value * 4) != value * 4:
raise ValueError('value is not a multiple of 0.25')
class Mixin:
a_min = Parameter()
a_max = Parameter()
class Mod(Mixin, Base):
def check_a(self, value):
if value == 0:
raise ValueError('value must not be 0')
mod = Mod('mod', LoggerStub(), {'description': 'test', 'a_min': {'value': -1}, 'a_max': {'value': 1}}, srv)
for good in [-1, -0.75, 0.25, 1]:
mod.write_a(good)
for bad in [-2, -0.1, 0, 0.9, 1.1]:
with pytest.raises(ValueError):
mod.write_a(bad)
class Mod2(Mixin, Base):
def check_a(self, value):
if value == 0:
raise ValueError('value must not be 0')
return True # indicates stop checking
mod2 = Mod2('mod2', LoggerStub(), {'description': 'test', 'a_min': {'value': -1}, 'a_max': {'value': 1}}, srv)
for good in [-2, -1, -0.75, 0.25, 1, 1.1]:
mod2.write_a(good)
with pytest.raises(ValueError):
mod2.write_a(0)

View File

@ -23,8 +23,8 @@
import pytest
import secop.protocol.messages as m
from secop.protocol.interface import decode_msg, encode_msg_frame
import frappy.protocol.messages as m
from frappy.protocol.interface import decode_msg, encode_msg_frame
# args are: msg tuple, msg bytes
MSG = [
@ -40,7 +40,7 @@ MSG = [
[(m.EVENTREPLY, 'mod:par', [123, dict(t=12.25)]), b'update mod:par [123, {"t": 12.25}]'],
[(m.HEARTBEATREQUEST, '0', None), b'ping 0'],
[(m.HEARTBEATREPLY, None, [None, dict(t=11.75)]), b'pong [null, {"t": 11.75}]'],
[(m.ERRORPREFIX + m.WRITEREQUEST, 'm:p', ['ErrClass', 'text', dict()]),
[(m.ERRORPREFIX + m.WRITEREQUEST, 'm:p', ['ErrClass', 'text', {}]),
b'error_change m:p ["ErrClass", "text", {}]'],
]
@pytest.mark.parametrize('msg, line', MSG)

View File

@ -21,7 +21,7 @@
# *****************************************************************************
import time
from secop.lib.multievent import MultiEvent
from frappy.lib.multievent import MultiEvent
def test_without_timeout():

View File

@ -25,10 +25,10 @@
# no fixtures needed
import pytest
from secop.datatypes import BoolType, FloatRange, IntRange
from secop.errors import ProgrammingError
from secop.modules import HasAccessibles
from secop.params import Command, Parameter
from frappy.datatypes import BoolType, FloatRange, IntRange
from frappy.errors import ProgrammingError
from frappy.modules import HasAccessibles
from frappy.params import Command, Parameter
def test_Command():
@ -109,3 +109,21 @@ def test_Export():
class Mod(HasAccessibles):
param = Parameter('description1', datatype=BoolType, default=False)
assert Mod.param.export == '_param'
@pytest.mark.parametrize('arg, value', [
('always', 0),
(0, 0),
('never', 999999999),
(999999999, 999999999),
(1, 1),
])
def test_update_unchanged_ok(arg, value):
par = Parameter('', datatype=FloatRange(), default=0, update_unchanged=arg)
assert par.update_unchanged == value
@pytest.mark.parametrize('arg', ['alws', '', -2, -0.1, None])
def test_update_unchanged_fail(arg):
with pytest.raises(ProgrammingError):
Parameter('', datatype=FloatRange(), default=0, update_unchanged=arg)

View File

@ -27,7 +27,7 @@ from collections import OrderedDict
import pytest
from secop.parse import Parser
from frappy.parse import Parser
# pylint: disable=redefined-outer-name

112
test/test_persistent.py Normal file
View File

@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import json
import os
from os.path import join
import pytest
from frappy.config import Param
from frappy.core import Module, ScaledInteger, IntRange, StringType, StructOf
from frappy.lib import generalConfig
from frappy.persistent import PersistentParam, PersistentMixin
class DispatcherStub:
def announce_update(self, modulename, pname, pobj):
pass
class LoggerStub:
def debug(self, fmt, *args):
print(fmt % args)
info = warning = exception = error = debug
handlers = []
logger = LoggerStub()
class ServerStub:
def __init__(self, equipment_id):
self.dispatcher = DispatcherStub()
self.dispatcher.equipment_id = equipment_id
class Mod(PersistentMixin, Module):
flt = PersistentParam('', ScaledInteger(0.1), default=1.0)
stc = PersistentParam('', StructOf(i=IntRange(0, 10), s=StringType()))
def write_flt(self, value):
return value
def write_stc(self, value):
return value
save_tests = [
({'flt': Param(5.5), 'stc': Param({'i': 3, 's': 'test'})},
{'flt': 55, 'stc': {'i': 3, 's': 'test'}}),
({'flt': Param(5.5)},
{'flt': 55, 'stc': {'i': 0, 's': ''}}), # saved default values
]
@pytest.mark.parametrize('cfg, data', save_tests)
def test_save(tmpdir, cfg, data):
generalConfig.logdir = tmpdir
cfg['description'] = ''
m = Mod('m', logger, cfg, ServerStub('savetest'))
assert m.writeDict == {k: getattr(m, k) for k in data}
m.writeDict.clear() # clear in order to indicate writing has happened
m.saveParameters()
with open(join(tmpdir, 'persistent', 'savetest.m.json'), encoding='utf-8') as f:
assert json.load(f) == data
load_tests = [
# check that value from cfg is overriding value from persistent file
({'flt': Param(5.5), 'stc': Param({'i': 3, 's': 'test'})},
{'flt': 33, 'stc': {'i': 1, 's': 'bar'}},
{'flt': 5.5, 'stc': {'i': 3, 's': 'test'}}),
# check that value from file is taken when not in cfg
({'flt': Param(3.5)},
{'flt': 35, 'stc': {'i': 2, 's': ''}},
{'flt': 3.5, 'stc': {'i': 2, 's': ''}}),
# check default is written when neither cfg is given nor persistent values present
({},
{},
{'flt': 1.0, 'stc': {'i': 0, 's': ''}}),
]
@pytest.mark.parametrize('cfg, data, written', load_tests)
def test_load(tmpdir, cfg, data, written):
generalConfig.logdir = tmpdir
os.makedirs(join(tmpdir, 'persistent'), exist_ok=True)
with open(join(tmpdir, 'persistent', 'loadtest.m.json'), 'w', encoding='utf-8') as f:
json.dump(data, f)
cfg['description'] = ''
m = Mod('m', logger, cfg, ServerStub('loadtest'))
assert m.writeDict == written
# parameter given in config must override values from file
for k, v in cfg.items():
assert getattr(m, k) == v['value']
for k, v in written.items():
assert getattr(m, k) == v

View File

@ -29,8 +29,9 @@ import logging
import pytest
from secop.core import Module, Parameter, FloatRange, Readable, ReadHandler, nopoll
from secop.lib.multievent import MultiEvent
from frappy.core import Module, Parameter, FloatRange, Readable, ReadHandler, nopoll
from frappy.lib.multievent import MultiEvent
from frappy.lib import generalConfig
class Time:
@ -66,13 +67,14 @@ class DispatcherStub:
class ServerStub:
def __init__(self):
generalConfig.testinit()
self.dispatcher = DispatcherStub()
class Base(Module):
def __init__(self):
srv = ServerStub()
super().__init__('mod', logging.getLogger('dummy'), dict(description=''), srv)
super().__init__('mod', logging.getLogger('dummy'), {'description': ''}, srv)
self.dispatcher = srv.dispatcher
def run(self, maxcycles):
@ -132,6 +134,7 @@ def test_poll(ncycles, pollinterval, slowinterval, mspan, pspan, monkeypatch):
m.pollinterval = pollinterval
m.slowInterval = slowinterval
m.run(ncycles)
print(getattr(m.parameters['param4'], 'stat', None))
assert not hasattr(m.parameters['param4'], 'stat')
for pname in ['value', 'status']:
pobj = m.parameters[pname]

View File

@ -23,10 +23,10 @@
import pytest
from secop.datatypes import FloatRange, IntRange, StringType, ValueType
from secop.errors import BadValueError, ConfigError, ProgrammingError
from secop.properties import HasProperties, Property
from secop.core import Parameter
from frappy.datatypes import FloatRange, IntRange, StringType, ValueType
from frappy.errors import RangeError, ConfigError, ProgrammingError
from frappy.properties import HasProperties, Property
from frappy.core import Parameter
def Prop(*args, name=None, **kwds):
@ -35,6 +35,7 @@ def Prop(*args, name=None, **kwds):
# Property(description, datatype, default, ...)
# pylint: disable=use-dict-literal
V_test_Property = [
[Prop(StringType(), 'default', extname='extname', mandatory=False),
dict(default='default', extname='extname', export=True, mandatory=False)
@ -99,7 +100,7 @@ def test_Properties():
assert Cls.aa.extname == '_aa'
cc = Cls()
with pytest.raises(BadValueError):
with pytest.raises(RangeError):
cc.aa = 137
assert Cls.bb.default == 0

View File

@ -21,18 +21,31 @@
# *****************************************************************************
from secop.lib.statemachine import StateMachine, Stop, Retry
from frappy.core import Drivable, Parameter
from frappy.datatypes import StatusType, Enum
from frappy.states import StateMachine, Stop, Retry, Finish, Start, HasStates, status_code
from frappy.lib import generalConfig
class LoggerStub:
def info(self, fmt, *args):
print(fmt % args)
def debug(self, fmt, *args):
pass
warning = exception = error = info
handlers = []
def rise(state):
state.step += 1
print('rise', state.step)
if state.init:
state.status = 'rise'
state.level += 1
if state.level > 3:
return turn
return Retry()
return Retry
def turn(state):
@ -42,7 +55,7 @@ def turn(state):
state.direction += 1
if state.direction > 3:
return fall
return Retry()
return Retry
def fall(state):
@ -52,32 +65,35 @@ def fall(state):
state.level -= 1
if state.level < 0:
raise ValueError('crash')
return Retry(0) # retry until crash!
return fall # retry until crash!
def error_handler(state):
state.last_error_name = type(state.last_error).__name__
def finish(state):
return Finish
class LoggerStub:
def debug(self, fmt, *args):
print(fmt % args)
info = warning = exception = error = debug
handlers = []
class Result:
cleanup_reason = None
def __init__(self):
self.states = []
class DummyThread:
def is_alive(self):
return True
def on_error(self, sm):
self.cleanup_reason = sm.cleanup_reason
def on_transition(self, sm, newstate):
self.states.append(newstate)
def test_fun():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
obj = Result()
s = StateMachine(step=0, status='', transition=obj.on_transition, logger=LoggerStub())
assert s.step == 0
assert s.status == ''
s.cycle() # do nothing
assert s.step == 0
s.start(rise, level=0, direction=0)
s.start(rise, cleanup=obj.on_error, level=0, direction=0)
s.cycle()
for i in range(1, 4):
assert s.status == 'rise'
assert s.step == i
@ -91,56 +107,217 @@ def test_fun():
assert s.direction == i - 4
s.cycle()
s.cycle() # -> crash
assert isinstance(s.last_error, ValueError)
assert str(s.last_error) == 'crash'
assert s.state is None
assert isinstance(obj.cleanup_reason, ValueError)
assert str(obj.cleanup_reason) == 'crash'
assert obj.states == [rise, turn, fall, fall, fall, fall, fall, None]
assert s.statefunc is None
def test_max_chain():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
s.start(fall, level=999+1, direction=0)
assert isinstance(s.last_error, RuntimeError)
assert s.state is None
obj = Result()
s = StateMachine(step=0, status='', transition=obj.on_transition, logger=LoggerStub())
s.start(fall, cleanup=obj.on_error, level=999+1, direction=0)
s.cycle()
assert isinstance(obj.cleanup_reason, RuntimeError)
assert s.statefunc is None
def test_stop():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
s.start(rise, level=0, direction=0)
for _ in range(1, 3):
obj = Result()
s = StateMachine(step=0, status='', transition=obj.on_transition, logger=LoggerStub())
s.start(rise, cleanup=obj.on_error, level=0, direction=0)
for _ in range(3):
s.cycle()
s.stop()
s.cycle()
assert s.last_error is Stop
assert s.state is None
assert isinstance(obj.cleanup_reason, Stop)
assert obj.states == [rise, None]
assert s.statefunc is None
def test_std_error_handling():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
s.start(rise, level=0, direction=0)
def test_error_handling():
obj = Result()
s = StateMachine(step=0, status='', transition=obj.on_transition, logger=LoggerStub())
s.start(rise, cleanup=obj.on_error, level=0, direction=0)
s.cycle()
s.level = None # -> TypeError on next step
s.cycle()
assert s.state is None # default error handler: stop machine
assert isinstance(s.last_error, TypeError)
assert not hasattr(s, 'last_error_name')
def test_default_error_handling():
s = StateMachine(step=0, status='', cleanup=error_handler, threaded=False, logger=LoggerStub())
s.start(rise, level=0, direction=0)
s.cycle()
s.level = None
s.cycle()
assert s.state is None
assert s.last_error_name == 'TypeError'
assert isinstance(s.last_error, TypeError)
assert isinstance(obj.cleanup_reason, TypeError)
assert obj.states == [rise, None]
assert s.statefunc is None
def test_cleanup_on_restart():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
s.start(rise, level=0, direction=0)
def test_on_restart():
obj = Result()
s = StateMachine(step=0, status='', transition=obj.on_transition, logger=LoggerStub())
s.start(rise, cleanup=obj.on_error, level=0, direction=0)
s.cycle()
s.cycle()
s.start(turn)
s.cycle()
assert s.state is turn
assert s.last_error is None
assert isinstance(obj.cleanup_reason, Start)
obj.cleanup_reason = None
s.cycle()
assert s.statefunc is turn
assert obj.cleanup_reason is None
assert obj.states == [rise, None, turn]
def test_finish():
obj = Result()
s = StateMachine(step=0, status='', transition=obj.on_transition, logger=LoggerStub())
s.start(finish, cleanup=obj.on_error, level=0, direction=0)
s.cycle()
s.cycle()
assert obj.states == [finish, None]
assert s.statefunc is None
assert s.cleanup_reason is None
Status = Enum(
Drivable.Status,
PREPARED=150,
PREPARING=340,
RAMPING=370,
STABILIZING=380,
FINALIZING=390,
)
class DispatcherStub:
# the first update from the poller comes a very short time after the
# initial value from the timestamp. However, in the test below
# the second update happens after the updates dict is cleared
# -> we have to inhibit the 'omit unchanged update' feature
def __init__(self, updates):
generalConfig.testinit(omit_unchanged_within=0)
self.updates = updates
def announce_update(self, modulename, pname, pobj):
assert modulename == 'obj'
if pobj.readerror:
self.updates.append((pname, pobj.readerror))
else:
self.updates.append((pname, pobj.value))
class ServerStub:
def __init__(self, updates):
self.dispatcher = DispatcherStub(updates)
class Mod(HasStates, Drivable):
status = Parameter(datatype=StatusType(Status))
_my_time = 0
def artificial_time(self):
return self._my_time
def state_transition(self, sm, newstate):
self.statelist.append(getattr(newstate, '__name__', None))
super().state_transition(sm, newstate)
def state_one(self, sm):
if sm.init:
return Retry
return self.state_two
@status_code('PREPARING', 'state 2')
def state_two(self, sm):
return self.state_three
@status_code('FINALIZING')
def state_three(self, sm):
if sm.init:
return Retry
return self.final_status('IDLE', 'finished')
def cleanup_one(self, sm):
self.statelist.append('cleanup one')
return self.cleanup_two
@status_code('BUSY', 'after cleanup')
def cleanup_two(self, sm):
if sm.init:
return Retry
return Finish
def doPoll(self):
super().doPoll()
self._my_time += 1
def create_module():
updates = []
obj = Mod('obj', LoggerStub(), {'description': ''}, ServerStub(updates))
obj.initModule()
obj.statelist = []
try:
obj._Module__pollThread(obj.polledModules, None)
except TypeError:
pass # None is not callable
updates.clear()
return obj, updates
def test_updates():
obj, updates = create_module()
obj.start_machine(obj.state_one)
for _ in range(10):
obj.doPoll()
assert updates == [
('status', (Status.BUSY, 'state one')), # default: BUSY, function name without '_'
('status', (Status.PREPARING, 'state 2')), # explicitly given
('status', (Status.FINALIZING, 'state three')), # only code given
('status', (Status.IDLE, 'finished')),
]
def test_stop_without_cleanup():
obj, updates = create_module()
obj.start_machine(obj.state_one)
obj.doPoll()
obj.stop_machine()
for _ in range(10):
obj.doPoll()
assert updates == [
('status', (Status.BUSY, 'state one')),
('status', (Status.BUSY, 'stopping')),
('status', (Status.IDLE, 'stopped')),
]
assert obj.statelist == ['state_one', None]
def test_stop_with_cleanup():
obj, updates = create_module()
obj.start_machine(obj.state_one, cleanup=obj.cleanup_one)
obj.doPoll()
obj.stop_machine()
for _ in range(10):
obj.doPoll()
assert obj.statelist == ['state_one', 'cleanup one', 'cleanup_two', None]
assert updates == [
('status', (Status.BUSY, 'state one')),
('status', (Status.BUSY, 'stopping')),
('status', (Status.BUSY, 'stopping (after cleanup)')),
('status', (Status.IDLE, 'stopped')),
]
def test_all_restart():
obj, updates = create_module()
obj.start_machine(obj.state_one, cleanup=obj.cleanup_one, statelist=[])
obj.doPoll()
obj.start_machine(obj.state_three)
for _ in range(10):
obj.doPoll()
assert obj.statelist == ['state_one', 'cleanup one', 'cleanup_two', None, 'state_three', None]
assert updates == [
('status', (Status.BUSY, 'state one')),
('status', (Status.FINALIZING, 'restarting')),
('status', (Status.FINALIZING, 'restarting (after cleanup)')),
('status', (Status.FINALIZING, 'state three')),
('status', (Status.IDLE, 'finished')),
]

View File

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
from frappy.core import Readable, Drivable, StatusType
from frappy_ess.epics import EpicsReadable, EpicsDrivable
readable_codes = {m.name: m.value for m in Readable.Status.members}
drivable_codes = {m.name: m.value for m in Drivable.Status.members}
def test_entangle_status():
try:
# pylint: disable=import-outside-toplevel
# pylint: disable=unused-import
import PyTango
except ImportError:
return
# pylint: disable=import-outside-toplevel
from frappy_mlz.entangle import AnalogInput, AnalogOutput, TemperatureController
assert AnalogInput.status.datatype.members[0].export_datatype() == {
"type": "enum",
"members": {"UNKNOWN": StatusType.UNKNOWN,
"DISABLED": StatusType.DISABLED,
**readable_codes},
}
assert AnalogOutput.status.datatype.members[0].export_datatype() == {
"type": "enum",
"members": {"UNKNOWN": StatusType.UNKNOWN,
"DISABLED": StatusType.DISABLED,
"UNSTABLE": StatusType.UNSTABLE,
**drivable_codes},
}
assert TemperatureController.status.datatype.members[0].export_datatype() == {
"type": "enum",
"members": {"UNKNOWN": StatusType.UNKNOWN,
"DISABLED": StatusType.DISABLED,
"UNSTABLE": StatusType.UNSTABLE,
**drivable_codes},
}
def test_epics_status():
assert EpicsReadable.status.datatype.members[0].export_datatype() == {
"type": "enum",
"members": {"UNKNOWN": StatusType.UNKNOWN,
**readable_codes},
}
assert EpicsDrivable.status.datatype.members[0].export_datatype() == {
"type": "enum",
"members": {"UNKNOWN": StatusType.UNKNOWN,
**drivable_codes},
}