- when applying overrides with reorder=True, take ctr from Override, else copy from the cloned Accesible. This did not work properly - reworked: - replaced CountedObj class by object_counter - accessibles created by a copy or by applying Overrides do not need fresh counted values - adjusted tests Change-Id: Id2fcf1ab1295aa1ea80ea81ae8cd02d36f86e969 Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/24926 Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de> Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
224 lines
8.1 KiB
Python
224 lines
8.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
# *****************************************************************************
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Module authors:
|
|
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
|
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
|
#
|
|
# *****************************************************************************
|
|
"""test data types."""
|
|
|
|
# no fixtures needed
|
|
#import pytest
|
|
|
|
import threading
|
|
from secop.datatypes import BoolType, FloatRange, StringType
|
|
from secop.modules import Communicator, Drivable, Module
|
|
from secop.params import Command, Override, Parameter
|
|
from secop.poller import BasicPoller
|
|
|
|
|
|
class DispatcherStub:
|
|
def __init__(self, updates):
|
|
self.updates = updates
|
|
|
|
def announce_update(self, modulename, pname, pobj):
|
|
self.updates.setdefault(modulename, {})
|
|
if pobj.readerror:
|
|
self.updates[modulename]['error', pname] = str(pobj.readerror)
|
|
else:
|
|
self.updates[modulename][pname] = pobj.value
|
|
|
|
|
|
class LoggerStub:
|
|
def debug(self, *args):
|
|
print(*args)
|
|
info = warning = exception = debug
|
|
|
|
|
|
class ServerStub:
|
|
def __init__(self, updates):
|
|
self.dispatcher = DispatcherStub(updates)
|
|
|
|
|
|
def test_Communicator():
|
|
o = Communicator('communicator', LoggerStub(), {'.description':''}, ServerStub({}))
|
|
o.earlyInit()
|
|
o.initModule()
|
|
event = threading.Event()
|
|
o.startModule(event.set)
|
|
assert event.is_set() # event should be set immediately
|
|
|
|
|
|
def test_ModuleMeta():
|
|
class Newclass1(Drivable):
|
|
parameters = {
|
|
'pollinterval': Override(reorder=True),
|
|
'param1' : Parameter('param1', datatype=BoolType(), default=False),
|
|
'param2': Parameter('param2', datatype=FloatRange(unit='Ohm'), default=True),
|
|
"cmd": Command('stuff', argument=BoolType(), result=BoolType())
|
|
}
|
|
commands = {
|
|
# intermixing parameters with commands is not recommended,
|
|
# but acceptable for influencing the order
|
|
'a1': Parameter('a1', datatype=BoolType(), default=False),
|
|
'a2': Parameter('a2', datatype=BoolType(), default=True),
|
|
'value': Override(datatype=StringType(), default='first'),
|
|
'cmd2': Command('another stuff', argument=BoolType(), result=BoolType()),
|
|
}
|
|
pollerClass = BasicPoller
|
|
|
|
def do_cmd(self, arg):
|
|
return not arg
|
|
|
|
def do_cmd2(self, arg):
|
|
return not arg
|
|
|
|
def read_param1(self):
|
|
return True
|
|
|
|
def read_param2(self):
|
|
return False
|
|
|
|
def read_a1(self):
|
|
return True
|
|
|
|
def read_a2(self):
|
|
return True
|
|
|
|
def read_value(self):
|
|
return 'second'
|
|
|
|
|
|
# first inherited accessibles, then Overrides with reorder=True and new accessibles
|
|
sortcheck1 = ['value', 'status', 'target', 'pollinterval',
|
|
'param1', 'param2', 'cmd', 'a1', 'a2', 'cmd2']
|
|
|
|
class Newclass2(Newclass1):
|
|
parameters = {
|
|
'cmd2': Override('another stuff'),
|
|
'value': Override(datatype=FloatRange(unit='deg'), reorder=True),
|
|
'a1': Override(datatype=FloatRange(unit='$/s'), reorder=True, readonly=False),
|
|
'b2': Parameter('<b2>', datatype=BoolType(), default=True,
|
|
poll=True, readonly=False, initwrite=True),
|
|
}
|
|
|
|
def write_a1(self, value):
|
|
self._a1_written = value
|
|
return value
|
|
|
|
def write_b2(self, value):
|
|
self._b2_written = value
|
|
return value
|
|
|
|
def read_value(self):
|
|
return 0
|
|
|
|
sortcheck2 = ['status', 'target', 'pollinterval',
|
|
'param1', 'param2', 'cmd', 'a2', 'cmd2', 'value', 'a1', 'b2']
|
|
|
|
logger = LoggerStub()
|
|
updates = {}
|
|
srv = ServerStub(updates)
|
|
|
|
params_found = set() # set of instance accessibles
|
|
objects = []
|
|
|
|
for newclass, sortcheck in [(Newclass1, sortcheck1), (Newclass2, sortcheck2)]:
|
|
o1 = newclass('o1', logger, {'.description':''}, srv)
|
|
o2 = newclass('o2', logger, {'.description':''}, srv)
|
|
for obj in [o1, o2]:
|
|
objects.append(obj)
|
|
ctr_found = set()
|
|
for n, o in obj.accessibles.items():
|
|
# check that instance accessibles are unique objects
|
|
assert o not in params_found
|
|
params_found.add(o)
|
|
assert o.ctr not in ctr_found
|
|
ctr_found.add(o.ctr)
|
|
check_order = [(obj.accessibles[n].ctr, n) for n in sortcheck]
|
|
assert check_order == sorted(check_order)
|
|
|
|
# check for inital updates working properly
|
|
o1 = Newclass1('o1', logger, {'.description':''}, srv)
|
|
expectedBeforeStart = {'target': 0.0, 'status': (Drivable.Status.IDLE, ''),
|
|
'param1': False, 'param2': 1.0, 'a1': 0.0, 'a2': True, 'pollinterval': 5.0,
|
|
'value': 'first'}
|
|
assert updates.pop('o1') == expectedBeforeStart
|
|
o1.earlyInit()
|
|
event = threading.Event()
|
|
o1.startModule(event.set)
|
|
event.wait()
|
|
# should contain polled values
|
|
expectedAfterStart = {'status': (Drivable.Status.IDLE, ''),
|
|
'value': 'second'}
|
|
assert updates.pop('o1') == expectedAfterStart
|
|
|
|
# check in addition if parameters are written
|
|
o2 = Newclass2('o2', logger, {'.description':'', 'a1': 2.7}, srv)
|
|
# no update for b2, as this has to be written
|
|
expectedBeforeStart['a1'] = 2.7
|
|
assert updates.pop('o2') == expectedBeforeStart
|
|
o2.earlyInit()
|
|
event = threading.Event()
|
|
o2.startModule(event.set)
|
|
event.wait()
|
|
# value has changed type, b2 and a1 are written
|
|
expectedAfterStart.update(value=0, b2=True, a1=2.7)
|
|
assert updates.pop('o2') == expectedAfterStart
|
|
assert o2._a1_written == 2.7
|
|
assert o2._b2_written is True
|
|
|
|
assert not updates
|
|
|
|
o1 = Newclass1('o1', logger, {'.description':''}, srv)
|
|
o2 = Newclass2('o2', logger, {'.description':''}, srv)
|
|
assert o2.parameters['a1'].datatype.unit == 'deg/s'
|
|
o2 = Newclass2('o2', logger, {'.description':'', 'value.unit':'mm', 'param2.unit':'mm'}, srv)
|
|
# check datatype is not shared
|
|
assert o1.parameters['param2'].datatype.unit == 'Ohm'
|
|
assert o2.parameters['param2'].datatype.unit == 'mm'
|
|
# check '$' in unit works properly
|
|
assert o2.parameters['a1'].datatype.unit == 'mm/s'
|
|
cfg = Newclass2.configurables
|
|
assert set(cfg.keys()) == {'export', 'group', 'description',
|
|
'meaning', 'visibility', 'implementation', 'interface_classes', 'target', 'stop',
|
|
'status', 'param1', 'param2', 'cmd', 'a2', 'pollinterval', 'b2', 'cmd2', 'value',
|
|
'a1'}
|
|
assert set(cfg['value'].keys()) == {'group', 'export', 'relative_resolution',
|
|
'visibility', 'unit', 'default', 'datatype', 'fmtstr',
|
|
'absolute_resolution', 'poll', 'max', 'min', 'readonly', 'constant',
|
|
'description', 'needscfg'}
|
|
|
|
# check on the level of classes
|
|
# this checks Newclass1 too, as it is inherited by Newclass2
|
|
for baseclass in Newclass2.__mro__:
|
|
# every cmd/param has to be collected to accessibles
|
|
acs = getattr(baseclass, 'accessibles', None)
|
|
if issubclass(baseclass, Module):
|
|
assert acs is not None
|
|
else: # do not check object or mixin
|
|
acs = {}
|
|
for n, o in acs.items():
|
|
# check that class accessibles are not reused as instance accessibles
|
|
assert o not in params_found
|
|
|
|
for o in objects:
|
|
o.earlyInit()
|
|
for o in objects:
|
|
o.initModule()
|