allow super calls on read_/write_ methods

instead of wrapping the access methods on the class directly,
create a wrapper class with the wrapped methods.

Change-Id: I93f3985bd06d6956b42a6690c087fb125e460ef9
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/30448
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2023-02-17 07:49:36 +01:00
parent ca4297e3a6
commit 4c577cf83d
4 changed files with 147 additions and 88 deletions

View File

@ -26,7 +26,6 @@
import time import time
import threading import threading
from collections import OrderedDict from collections import OrderedDict
from functools import wraps
from frappy.datatypes import ArrayOf, BoolType, EnumType, FloatRange, \ from frappy.datatypes import ArrayOf, BoolType, EnumType, FloatRange, \
IntRange, StatusType, StringType, TextType, TupleOf, DiscouragedConversion IntRange, StatusType, StringType, TextType, TupleOf, DiscouragedConversion
@ -43,6 +42,8 @@ Done = UniqueObject('Done')
indicating that the setter is triggered already""" indicating that the setter is triggered already"""
wrapperClasses = {}
class HasAccessibles(HasProperties): class HasAccessibles(HasProperties):
"""base class of Module """base class of Module
@ -52,9 +53,14 @@ class HasAccessibles(HasProperties):
wrap read_*/write_* methods wrap read_*/write_* methods
(so the dispatcher will get notified of changed values) (so the dispatcher will get notified of changed values)
""" """
isWrapped = False
checkedMethods = set()
@classmethod @classmethod
def __init_subclass__(cls): # pylint: disable=too-many-branches def __init_subclass__(cls): # pylint: disable=too-many-branches
super().__init_subclass__() super().__init_subclass__()
if cls.isWrapped:
return
# merge accessibles from all sub-classes, treat overrides # merge accessibles from all sub-classes, treat overrides
# for now, allow to use also the old syntax (parameters/commands dict) # for now, allow to use also the old syntax (parameters/commands dict)
accessibles = OrderedDict() # dict of accessibles accessibles = OrderedDict() # dict of accessibles
@ -102,84 +108,82 @@ class HasAccessibles(HasProperties):
# declarations within the same class # declarations within the same class
cls.accessibles = accessibles cls.accessibles = accessibles
# Correct naming of EnumTypes cls.wrappedAttributes = {'isWrapped': True}
# moved to Parameter.__set_name__ # create wrappers for access methods
wrapped_name = '_' + cls.__name__
# check validity of Parameter entries
for pname, pobj in accessibles.items(): for pname, pobj in accessibles.items():
# XXX: create getters for the units of params ??
# wrap of reading/writing funcs # wrap of reading/writing funcs
if not isinstance(pobj, Parameter): if not isinstance(pobj, Parameter):
# nothing to do for Commands # nothing to do for Commands
continue continue
rfunc = getattr(cls, 'read_' + pname, None) rname = 'read_' + pname
rfunc = getattr(cls, rname, None)
# create wrapper
if rfunc:
# create wrapper except when read function is already wrapped or auto generatoed def new_rfunc(self, pname=pname, rfunc=rfunc):
if not getattr(rfunc, 'wrapped', False): with self.accessLock:
try:
value = rfunc(self)
self.log.debug("read_%s returned %r", pname, value)
except Exception as e:
self.log.debug("read_%s failed with %r", pname, e)
self.announceUpdate(pname, None, e)
raise
if value is Done:
return getattr(self, pname)
setattr(self, pname, value) # important! trigger the setter
return value
if rfunc: new_rfunc.poll = getattr(rfunc, 'poll', True)
else:
@wraps(rfunc) # handles __wrapped__ and __doc__ def new_rfunc(self, pname=pname):
def new_rfunc(self, pname=pname, rfunc=rfunc): return getattr(self, pname)
with self.accessLock:
try:
value = rfunc(self)
self.log.debug("read_%s returned %r", pname, value)
except Exception as e:
self.log.debug("read_%s failed with %r", pname, e)
self.announceUpdate(pname, None, e)
raise
if value is Done:
return getattr(self, pname)
setattr(self, pname, value) # important! trigger the setter
return value
new_rfunc.poll = getattr(rfunc, 'poll', True) new_rfunc.poll = False
else:
def new_rfunc(self, pname=pname): new_rfunc.__name__ = rname
return getattr(self, pname) new_rfunc.__qualname__ = wrapped_name + '.' + rname
new_rfunc.__module__ = cls.__module__
cls.wrappedAttributes[rname] = new_rfunc
new_rfunc.poll = False wname = 'write_' + pname
new_rfunc.__doc__ = 'auto generated read method for ' + pname wfunc = getattr(cls, wname, None)
if wfunc:
# allow write method even when parameter is readonly, but internally writable
new_rfunc.wrapped = True # indicate to subclasses that no more wrapping is needed def new_wfunc(self, value, pname=pname, wfunc=wfunc):
setattr(cls, 'read_' + pname, new_rfunc) with self.accessLock:
pobj = self.accessibles[pname]
self.log.debug('validate %r for %r', value, pname)
# we do not need to handle errors here, we do not
# want to make a parameter invalid, when a write failed
new_value = pobj.datatype(value)
new_value = wfunc(self, new_value)
self.log.debug('write_%s(%r) returned %r', pname, value, new_value)
if new_value is Done:
# setattr(self, pname, getattr(self, pname))
return getattr(self, pname)
setattr(self, pname, new_value) # important! trigger the setter
return new_value
wfunc = getattr(cls, 'write_' + pname, None) elif pobj.readonly:
if not pobj.readonly or wfunc: # allow write_ method even when pobj is not readonly new_wfunc = None
# create wrapper except when write function is already wrapped or auto generated else:
if not getattr(wfunc, 'wrapped', False):
if wfunc: def new_wfunc(self, value, pname=pname):
setattr(self, pname, value)
return value
@wraps(wfunc) # handles __wrapped__ and __doc__ if new_wfunc:
def new_wfunc(self, value, pname=pname, wfunc=wfunc): new_wfunc.__name__ = wname
with self.accessLock: new_wfunc.__qualname__ = wrapped_name + '.' + wname
pobj = self.accessibles[pname] new_wfunc.__module__ = cls.__module__
self.log.debug('validate %r for %r', value, pname) cls.wrappedAttributes[wname] = new_wfunc
# we do not need to handle errors here, we do not
# want to make a parameter invalid, when a write failed
new_value = pobj.datatype(value)
new_value = wfunc(self, new_value)
self.log.debug('write_%s(%r) returned %r', pname, value, new_value)
if new_value is Done:
# setattr(self, pname, getattr(self, pname))
return getattr(self, pname)
setattr(self, pname, new_value) # important! trigger the setter
return new_value
else:
def new_wfunc(self, value, pname=pname): cls.checkedMethods.update(cls.wrappedAttributes)
setattr(self, pname, value)
return value
new_wfunc.__doc__ = 'auto generated write method for ' + pname
new_wfunc.wrapped = True # indicate to subclasses that no more wrapping is needed
setattr(cls, 'write_' + pname, new_wfunc)
# check for programming errors # check for programming errors
for attrname in dir(cls): for attrname in dir(cls):
@ -189,7 +193,7 @@ class HasAccessibles(HasProperties):
if prefix == 'do': if prefix == 'do':
raise ProgrammingError('%r: old style command %r not supported anymore' raise ProgrammingError('%r: old style command %r not supported anymore'
% (cls.__name__, attrname)) % (cls.__name__, attrname))
if prefix in ('read', 'write') and not getattr(getattr(cls, attrname), 'wrapped', False): if prefix in ('read', 'write') and attrname not in cls.checkedMethods:
raise ProgrammingError('%s.%s defined, but %r is no parameter' raise ProgrammingError('%s.%s defined, but %r is no parameter'
% (cls.__name__, attrname, pname)) % (cls.__name__, attrname, pname))
@ -206,6 +210,13 @@ class HasAccessibles(HasProperties):
res[param][pn] = pv res[param][pn] = pv
cls.configurables = res cls.configurables = res
def __new__(cls, *args, **kwds):
wrapper_class = wrapperClasses.get(cls)
if wrapper_class is None:
wrapper_class = type('_' + cls.__name__, (cls,), cls.wrappedAttributes)
wrapperClasses[cls] = wrapper_class
return super().__new__(wrapper_class)
class Feature(HasAccessibles): class Feature(HasAccessibles):
"""all things belonging to a small, predefined functionality influencing the working of a module """all things belonging to a small, predefined functionality influencing the working of a module

View File

@ -72,9 +72,8 @@ def wraps(func):
class Handler: class Handler:
func = None func = None
method_names = set() # this is shared among all instances of handlers!
wrapped = True # allow to use read_* or write_* as name of the decorated method
prefix = None # 'read_' or 'write_' prefix = None # 'read_' or 'write_'
method_names = set() # global set registering used method names
poll = None poll = None
def __init__(self, keys): def __init__(self, keys):
@ -86,12 +85,13 @@ class Handler:
def __call__(self, func): def __call__(self, func):
"""decorator call""" """decorator call"""
self.func = func if (func.__module__, func.__qualname__) in self.method_names:
if func.__qualname__ in self.method_names: # make sure method name is not used twice
# (else __set_name__ will not be called)
raise ProgrammingError('duplicate method %r' % func.__qualname__) raise ProgrammingError('duplicate method %r' % func.__qualname__)
self.func = func
func.wrapped = False func.wrapped = False
# __qualname__ used here (avoid conflicts between different modules) self.method_names.add((func.__module__, func.__qualname__))
self.method_names.add(func.__qualname__)
return self return self
def __get__(self, obj, owner=None): def __get__(self, obj, owner=None):
@ -102,8 +102,9 @@ class Handler:
def __set_name__(self, owner, name): def __set_name__(self, owner, name):
"""create the wrapped read_* or write_* methods""" """create the wrapped read_* or write_* methods"""
# at this point, this 'method_names' entry is no longer used -> delete
self.method_names.discard(self.func.__qualname__) self.method_names.discard((self.func.__module__, self.func.__qualname__))
owner.checkedMethods.add(name)
for key in self.keys: for key in self.keys:
wrapped = self.wrap(key) wrapped = self.wrap(key)
method_name = self.prefix + key method_name = self.prefix + key
@ -112,7 +113,7 @@ class Handler:
# wrapped.poll is False when the nopoll decorator is applied either to self.func or to self # wrapped.poll is False when the nopoll decorator is applied either to self.func or to self
wrapped.poll = getattr(wrapped, 'poll', self.poll) wrapped.poll = getattr(wrapped, 'poll', self.poll)
func = getattr(owner, method_name, None) func = getattr(owner, method_name, None)
if func and not func.wrapped: if func and method_name in owner.__dict__:
raise ProgrammingError('superfluous method %s.%s (overwritten by %s)' raise ProgrammingError('superfluous method %s.%s (overwritten by %s)'
% (owner.__name__, method_name, self.__class__.__name__)) % (owner.__name__, method_name, self.__class__.__name__))
setattr(owner, method_name, wrapped) setattr(owner, method_name, wrapped)

View File

@ -84,10 +84,9 @@ def test_handler():
data.append(pname) data.append(pname)
return value return value
assert Mod.read_a.poll is True
assert Mod.read_b.poll is True
m = Mod() m = Mod()
assert m.read_a.poll is True
assert m.read_b.poll is True
data.append(1.2) data.append(1.2)
assert m.read_a() == 1.2 assert m.read_a() == 1.2
@ -130,9 +129,10 @@ def test_common_handler():
self.b = values['b'] self.b = values['b']
data.append('write_hdl') data.append('write_hdl')
assert set([Mod.read_a.poll, Mod.read_b.poll]) == {True, False}
m = Mod(a=1, b=2) m = Mod(a=1, b=2)
assert set([m.read_a.poll, m.read_b.poll]) == {True, False}
assert m.writeDict == {'a': 1, 'b': 2} assert m.writeDict == {'a': 1, 'b': 2}
m.write_a(3) m.write_a(3)
assert m.a == 3 assert m.a == 3
@ -172,8 +172,10 @@ def test_nopoll():
def read_hdl(self): def read_hdl(self):
pass pass
assert Mod1.read_a.poll is True m = Mod1()
assert Mod1.read_b.poll is True print(m, m.read_a)
assert m.read_a.poll is True
assert m.read_b.poll is True
class Mod2(ModuleTest): class Mod2(ModuleTest):
a = Parameter('', FloatRange(), readonly=False) a = Parameter('', FloatRange(), readonly=False)
@ -183,8 +185,9 @@ def test_nopoll():
def read_hdl(self): def read_hdl(self):
pass pass
assert Mod2.read_a.poll is True m = Mod2()
assert Mod2.read_b.poll is False assert m.read_a.poll is True
assert m.read_b.poll is False
class Mod3(ModuleTest): class Mod3(ModuleTest):
a = Parameter('', FloatRange(), readonly=False) a = Parameter('', FloatRange(), readonly=False)
@ -195,8 +198,9 @@ def test_nopoll():
def read_hdl(self): def read_hdl(self):
pass pass
assert Mod3.read_a.poll is False m = Mod3()
assert Mod3.read_b.poll is False assert m.read_a.poll is False
assert m.read_b.poll is False
class Mod4(ModuleTest): class Mod4(ModuleTest):
a = Parameter('', FloatRange(), readonly=False) a = Parameter('', FloatRange(), readonly=False)
@ -207,8 +211,9 @@ def test_nopoll():
def read_hdl(self): def read_hdl(self):
pass pass
assert Mod4.read_a.poll is False m = Mod4()
assert Mod4.read_b.poll is False assert m.read_a.poll is False
assert m.read_b.poll is False
class Mod5(ModuleTest): class Mod5(ModuleTest):
a = Parameter('', FloatRange(), readonly=False) a = Parameter('', FloatRange(), readonly=False)
@ -219,8 +224,9 @@ def test_nopoll():
def read_hdl(self): def read_hdl(self):
pass pass
assert Mod5.read_a.poll is False m = Mod5()
assert Mod5.read_b.poll is False assert m.read_a.poll is False
assert m.read_b.poll is False
class Mod6(ModuleTest): class Mod6(ModuleTest):
a = Parameter('', FloatRange(), readonly=False) a = Parameter('', FloatRange(), readonly=False)
@ -231,5 +237,6 @@ def test_nopoll():
def read_hdl(self): def read_hdl(self):
pass pass
assert Mod6.read_a.poll is False m = Mod6()
assert Mod6.read_b.poll is False assert m.read_a.poll is False
assert m.read_b.poll is False

View File

@ -687,3 +687,43 @@ def test_deferred_main_unit(config, dynamicunit, finalunit, someunit):
assert m.parameters['ramp'].datatype.unit == finalunit + '/min' assert m.parameters['ramp'].datatype.unit == finalunit + '/min'
# when someparam.unit is configured, this differs from finalunit # when someparam.unit is configured, this differs from finalunit
assert m.parameters['someparam'].datatype.unit == someunit assert m.parameters['someparam'].datatype.unit == someunit
def test_super_call():
class Base(Readable):
def read_status(self):
return Readable.Status.IDLE, 'base'
class Mod(Base):
def read_status(self):
code, text = super().read_status()
return code, text + ' (extended)'
class DispatcherStub1:
def __init__(self, updates):
self.updates = updates
def announce_update(self, modulename, pname, pobj):
if pobj.readerror:
raise pobj.readerror
self.updates.append((modulename, pname, pobj.value))
class ServerStub1:
def __init__(self, updates):
self.dispatcher = DispatcherStub1(updates)
updates = []
srv = ServerStub1(updates)
b = Base('b', logger, {'description': ''}, srv)
b.read_status()
assert updates == [('b', 'status', ('IDLE', 'base'))]
updates.clear()
m = Mod('m', logger, {'description': ''}, srv)
m.read_status()
# in the version before change 'allow super calls on read_/write_ methods'
# updates would contain two items
assert updates == [('m', 'status', ('IDLE', 'base (extended)'))]
assert type(m).__name__ == '_Mod'
assert type(m).__mro__[1:5] == (Mod, Base, Readable, Module)