add StructParam
adds a generic solution for creating parameters with struct datatype with their members linked to individual parameters. main use case: ctrlpars read_*/write_* methods are either created for the main (structed) parameter based on the corresponding methods of the individual parameters or the methods for the individual parameters are created based on the methods of the main parameter + disable pylint use-dict-literal Change-Id: I7f1d9fb3d3b2226b548c2999bbfebe2ba5ac285e Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/31405 Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
parent
47da14eef9
commit
05189d094a
@ -540,7 +540,6 @@ class Limit(Parameter):
|
||||
if self.hasDatatype():
|
||||
return # the programmer is responsible that a given datatype is correct
|
||||
postfix = self.name.rpartition('_')[-1]
|
||||
postfix = self.name.rpartition('_')[-1]
|
||||
if postfix == 'limits':
|
||||
self.datatype = TupleOf(datatype, datatype)
|
||||
self.default = (datatype.min, datatype.max)
|
||||
@ -562,6 +561,7 @@ PREDEFINED_ACCESSIBLES = {
|
||||
'unit': Parameter, # reserved name
|
||||
'loglevel': Parameter, # reserved name
|
||||
'mode': Parameter, # reserved name
|
||||
'ctrlpars': Parameter, # spec to be confirmed
|
||||
'stop': Command,
|
||||
'reset': Command,
|
||||
'go': Command,
|
||||
|
164
frappy/structparam.py
Normal file
164
frappy/structparam.py
Normal file
@ -0,0 +1,164 @@
|
||||
# *****************************************************************************
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Markus Zolliker <markus.zolliker@psi.ch>
|
||||
#
|
||||
# *****************************************************************************
|
||||
"""convenience class to create a struct Parameter together with indivdual params
|
||||
|
||||
Usage:
|
||||
|
||||
class Controller(Drivable):
|
||||
|
||||
...
|
||||
|
||||
ctrlpars = StructParam('ctrlpars struct', [
|
||||
('pid_p', 'p', Parameter('control parameter p', FloatRange())),
|
||||
('pid_i', 'i', Parameter('control parameter i', FloatRange())),
|
||||
('pid_d', 'd', Parameter('control parameter d', FloatRange())),
|
||||
], readonly=False)
|
||||
|
||||
...
|
||||
|
||||
then implement either read_ctrlpars and write_ctrlpars or
|
||||
read_pid_p, read_pid_i, read_pid_d, write_pid_p, write_pid_i and write_pid_d
|
||||
|
||||
the methods not implemented will be created automatically
|
||||
"""
|
||||
|
||||
from frappy.core import Parameter, Property
|
||||
from frappy.datatypes import BoolType, DataType, StructOf, ValueType
|
||||
from frappy.errors import ProgrammingError
|
||||
|
||||
|
||||
class StructParam(Parameter):
|
||||
"""create a struct parameter together with individual parameters
|
||||
|
||||
in addition to normal Parameter arguments:
|
||||
|
||||
:param paramdict: dict <member name> of Parameter(...)
|
||||
:param prefix_or_map: either a prefix for the parameter name to add to the member name
|
||||
or a dict <member name> or <paramerter name>
|
||||
"""
|
||||
# use properties, as simple attributes are not considered on copy()
|
||||
paramdict = Property('dict <parametername> of Parameter(...)', ValueType())
|
||||
hasStructRW = Property('has a read_<struct param> or write_<struct param> method',
|
||||
BoolType(), default=False)
|
||||
|
||||
insideRW = 0 # counter for avoiding multiple superfluous updates
|
||||
|
||||
def __init__(self, description=None, paramdict=None, prefix_or_map='', *, datatype=None, readonly=False, **kwds):
|
||||
if isinstance(paramdict, DataType):
|
||||
raise ProgrammingError('second argument must be a dict of Param')
|
||||
if datatype is None and paramdict is not None: # omit the following on Parameter.copy()
|
||||
if isinstance(prefix_or_map, str):
|
||||
prefix_or_map = {m: prefix_or_map + m for m in paramdict}
|
||||
for membername, param in paramdict.items():
|
||||
param.name = prefix_or_map[membername]
|
||||
datatype = StructOf(**{m: p.datatype for m, p in paramdict.items()})
|
||||
kwds['influences'] = [p.name for p in paramdict.values()]
|
||||
self.updateEnable = {}
|
||||
super().__init__(description, datatype, paramdict=paramdict, readonly=readonly, **kwds)
|
||||
|
||||
def __set_name__(self, owner, name):
|
||||
# names of access methods of structed param (e.g. ctrlpars)
|
||||
struct_read_name = f'read_{name}' # e.g. 'read_ctrlpars'
|
||||
struct_write_name = f'write_{name}' # e.h. 'write_ctrlpars'
|
||||
self.hasStructRW = hasattr(owner, struct_read_name) or hasattr(owner, struct_write_name)
|
||||
|
||||
for membername, param in self.paramdict.items():
|
||||
pname = param.name
|
||||
changes = {
|
||||
'readonly': self.readonly,
|
||||
'influences': set(param.influences) | {name},
|
||||
}
|
||||
param.ownProperties.update(changes)
|
||||
param.init(changes)
|
||||
setattr(owner, pname, param)
|
||||
param.__set_name__(owner, param.name)
|
||||
|
||||
if self.hasStructRW:
|
||||
rname = f'read_{pname}'
|
||||
|
||||
if not hasattr(owner, rname):
|
||||
def rfunc(self, membername=membername, struct_read_name=struct_read_name):
|
||||
return getattr(self, struct_read_name)()[membername]
|
||||
|
||||
rfunc.poll = False # read_<struct param> is polled only
|
||||
setattr(owner, rname, rfunc)
|
||||
|
||||
if not self.readonly:
|
||||
wname = f'write_{pname}'
|
||||
if not hasattr(owner, wname):
|
||||
def wfunc(self, value, membername=membername,
|
||||
name=name, rname=rname, struct_write_name=struct_write_name):
|
||||
valuedict = dict(getattr(self, name))
|
||||
valuedict[membername] = value
|
||||
getattr(self, struct_write_name)(valuedict)
|
||||
return getattr(self, rname)()
|
||||
|
||||
setattr(owner, wname, wfunc)
|
||||
|
||||
if not self.hasStructRW:
|
||||
if not hasattr(owner, struct_read_name):
|
||||
def struct_read_func(self, name=name, flist=tuple(
|
||||
(m, f'read_{p.name}') for m, p in self.paramdict.items())):
|
||||
pobj = self.parameters[name]
|
||||
# disable updates generated from the callbacks of individual params
|
||||
pobj.insideRW += 1 # guarded by self.accessLock
|
||||
try:
|
||||
return {m: getattr(self, f)() for m, f in flist}
|
||||
finally:
|
||||
pobj.insideRW -= 1
|
||||
|
||||
setattr(owner, struct_read_name, struct_read_func)
|
||||
|
||||
if not (self.readonly or hasattr(owner, struct_write_name)):
|
||||
|
||||
def struct_write_func(self, value, name=name, funclist=tuple(
|
||||
(m, f'write_{p.name}') for m, p in self.paramdict.items())):
|
||||
pobj = self.parameters[name]
|
||||
pobj.insideRW += 1 # guarded by self.accessLock
|
||||
try:
|
||||
return {m: getattr(self, f)(value[m]) for m, f in funclist}
|
||||
finally:
|
||||
pobj.insideRW -= 1
|
||||
|
||||
setattr(owner, struct_write_name, struct_write_func)
|
||||
|
||||
super().__set_name__(owner, name)
|
||||
|
||||
def finish(self, modobj=None):
|
||||
"""register callbacks for consistency"""
|
||||
super().finish(modobj)
|
||||
if modobj:
|
||||
|
||||
if self.hasStructRW:
|
||||
def cb(value, modobj=modobj, structparam=self):
|
||||
for membername, param in structparam.paramdict.items():
|
||||
setattr(modobj, param.name, value[membername])
|
||||
|
||||
modobj.valueCallbacks[self.name].append(cb)
|
||||
else:
|
||||
for membername, param in self.paramdict.items():
|
||||
def cb(value, modobj=modobj, structparam=self, membername=membername):
|
||||
if not structparam.insideRW:
|
||||
prev = dict(getattr(modobj, structparam.name))
|
||||
prev[membername] = value
|
||||
setattr(modobj, structparam.name, prev)
|
||||
|
||||
modobj.valueCallbacks[param.name].append(cb)
|
132
test/test_ctrlpars.py
Normal file
132
test/test_ctrlpars.py
Normal file
@ -0,0 +1,132 @@
|
||||
# *****************************************************************************
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Markus Zolliker <markus.zolliker@psi.ch>
|
||||
#
|
||||
# *****************************************************************************
|
||||
"""test frappy.mixins.HasCtrlPars"""
|
||||
|
||||
|
||||
from test.test_modules import LoggerStub, ServerStub
|
||||
from frappy.core import FloatRange, Module, Parameter
|
||||
from frappy.structparam import StructParam
|
||||
|
||||
|
||||
def test_with_read_ctrlpars():
|
||||
class Mod(Module):
|
||||
ctrlpars = StructParam('ctrlpar struct', dict(
|
||||
p = Parameter('control parameter p', FloatRange()),
|
||||
i = Parameter('control parameter i', FloatRange()),
|
||||
d = Parameter('control parameter d', FloatRange()),
|
||||
), 'pid_', readonly=False)
|
||||
|
||||
def read_ctrlpars(self):
|
||||
return self._ctrlpars
|
||||
|
||||
def write_ctrlpars(self, value):
|
||||
self._ctrlpars = value
|
||||
return self.read_ctrlpars()
|
||||
|
||||
logger = LoggerStub()
|
||||
updates = {}
|
||||
srv = ServerStub(updates)
|
||||
|
||||
ms = Mod('ms', logger, {'description':''}, srv)
|
||||
|
||||
value = {'p': 1, 'i': 2, 'd': 3}
|
||||
assert ms.write_ctrlpars(value) == value
|
||||
assert ms.read_ctrlpars() == value
|
||||
assert ms.read_pid_p() == 1
|
||||
assert ms.read_pid_i() == 2
|
||||
assert ms.read_pid_d() == 3
|
||||
assert ms.write_pid_i(5) == 5
|
||||
assert ms.write_pid_d(0) == 0
|
||||
assert ms.read_ctrlpars() == {'p': 1, 'i': 5, 'd': 0}
|
||||
assert set(Mod.ctrlpars.influences) == {'pid_p', 'pid_i', 'pid_d'}
|
||||
assert Mod.pid_p.influences == ('ctrlpars',)
|
||||
assert Mod.pid_i.influences == ('ctrlpars',)
|
||||
assert Mod.pid_d.influences == ('ctrlpars',)
|
||||
|
||||
|
||||
def test_without_read_ctrlpars():
|
||||
class Mod(Module):
|
||||
ctrlpars = StructParam('ctrlpar struct', dict(
|
||||
p = Parameter('control parameter p', FloatRange()),
|
||||
i = Parameter('control parameter i', FloatRange()),
|
||||
d = Parameter('control parameter d', FloatRange()),
|
||||
), readonly=False)
|
||||
|
||||
_pid_p = 0
|
||||
_pid_i = 0
|
||||
|
||||
def read_p(self):
|
||||
return self._pid_p
|
||||
|
||||
def write_p(self, value):
|
||||
self._pid_p = value
|
||||
return self.read_p()
|
||||
|
||||
def read_i(self):
|
||||
return self._pid_i
|
||||
|
||||
def write_i(self, value):
|
||||
self._pid_i = value
|
||||
return self.read_i()
|
||||
|
||||
logger = LoggerStub()
|
||||
updates = {}
|
||||
srv = ServerStub(updates)
|
||||
|
||||
ms = Mod('ms', logger, {'description': ''}, srv)
|
||||
|
||||
value = {'p': 1, 'i': 2, 'd': 3}
|
||||
assert ms.write_ctrlpars(value) == value
|
||||
assert ms.read_ctrlpars() == value
|
||||
assert ms.read_p() == 1
|
||||
assert ms.read_i() == 2
|
||||
assert ms.read_d() == 3
|
||||
assert ms.write_i(5) == 5
|
||||
assert ms.write_d(0) == 0
|
||||
assert ms.read_ctrlpars() == {'p': 1, 'i': 5, 'd': 0}
|
||||
assert set(Mod.ctrlpars.influences) == {'p', 'i', 'd'}
|
||||
assert Mod.p.influences == ('ctrlpars',)
|
||||
assert Mod.i.influences == ('ctrlpars',)
|
||||
assert Mod.d.influences == ('ctrlpars',)
|
||||
|
||||
|
||||
def test_readonly():
|
||||
class Mod(Module):
|
||||
ctrlpars = StructParam('ctrlpar struct', dict(
|
||||
p = Parameter('control parameter p', FloatRange()),
|
||||
i = Parameter('control parameter i', FloatRange()),
|
||||
d = Parameter('control parameter d', FloatRange()),
|
||||
), {'p': 'pp', 'i':'ii', 'd': 'dd'}, readonly=True)
|
||||
|
||||
assert Mod.ctrlpars.readonly is True
|
||||
assert Mod.pp.readonly is True
|
||||
assert Mod.ii.readonly is True
|
||||
assert Mod.dd.readonly is True
|
||||
|
||||
|
||||
def test_order_dependence1():
|
||||
test_without_read_ctrlpars()
|
||||
test_with_read_ctrlpars()
|
||||
|
||||
|
||||
def test_order_dependence2():
|
||||
test_with_read_ctrlpars()
|
||||
test_without_read_ctrlpars()
|
Loading…
x
Reference in New Issue
Block a user