common read/write handlers
introduce CommonReadHandler and CommonWriteHandler for better handling of the case when several parameters are read or written in one go. - ppms: use common handlers + ppms: modify error handling when command result is not OK + store poll attribute on read_* methods Change-Id: I9a9d0972e206956bcb5a83c204fe5f92c69716e3 Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/27822 Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de> Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
parent
99588fc815
commit
bf1761bbc4
@ -146,11 +146,14 @@ class HasAccessibles(HasProperties):
|
||||
return getattr(self, pname)
|
||||
setattr(self, pname, value) # important! trigger the setter
|
||||
return value
|
||||
|
||||
new_rfunc.poll = getattr(rfunc, 'poll', True) and pobj.poll
|
||||
else:
|
||||
|
||||
def new_rfunc(self, pname=pname):
|
||||
return getattr(self, pname)
|
||||
|
||||
new_rfunc.poll = False
|
||||
new_rfunc.__doc__ = 'auto generated read method for ' + pname
|
||||
|
||||
new_rfunc.wrapped = True # indicate to subclasses that no more wrapping is needed
|
||||
@ -582,7 +585,9 @@ class Module(HasAccessibles):
|
||||
def pollOneParam(self, pname):
|
||||
"""poll parameter <pname> with proper error handling"""
|
||||
try:
|
||||
getattr(self, 'read_' + pname)()
|
||||
rfunc = getattr(self, 'read_' + pname)
|
||||
if rfunc.poll: # TODO: handle this in poller
|
||||
rfunc()
|
||||
except SilentError:
|
||||
pass
|
||||
except SECoPError as e:
|
||||
|
@ -27,17 +27,17 @@ Example 1: combined read/write for multiple parameters
|
||||
|
||||
PID_PARAMS = ['p', 'i', 'd']
|
||||
|
||||
@ReadHandler(PID_PARAMS)
|
||||
def read_pid(self, pname):
|
||||
@CommonReadHandler(PID_PARAMS)
|
||||
def read_pid(self):
|
||||
self.p, self.i, self.d = self.get_pid_from_hw()
|
||||
return Done # Done is indicating that the parameters are already assigned
|
||||
# no return value
|
||||
|
||||
@WriteHandler(PID_PARAMS)
|
||||
def write_pid(self, pname, value):
|
||||
pid = self.get_pid_from_hw() # assume this returns a list
|
||||
pid[PID_PARAMS.index(pname)] = value
|
||||
self.put_pid_to_hw(pid)
|
||||
return self.read_pid()
|
||||
@CommonWriteHandler(PID_PARAMS)
|
||||
def write_pid(self, values):
|
||||
# values is a dict[pname] of value, we convert it to a tuple here
|
||||
self.put_pid_to_hw(values.as_tuple('p', 'i', 'd'')) # or .as_tuple(*PID_PARAMS)
|
||||
self.read_pid()
|
||||
# no return value
|
||||
|
||||
Example 2: addressable HW parameters
|
||||
|
||||
@ -62,6 +62,8 @@ class Handler:
|
||||
func = None
|
||||
method_names = set() # this is shared among all instances of handlers!
|
||||
wrapped = True # allow to use read_* or write_* as name of the decorated method
|
||||
prefix = None # 'read_' or 'write_'
|
||||
poll = None
|
||||
|
||||
def __init__(self, keys):
|
||||
"""initialize the decorator
|
||||
@ -86,55 +88,77 @@ class Handler:
|
||||
return self
|
||||
return self.func.__get__(obj, owner)
|
||||
|
||||
|
||||
class ReadHandler(Handler):
|
||||
"""decorator for read methods"""
|
||||
|
||||
def __set_name__(self, owner, name):
|
||||
"""create the wrapped read_* methods"""
|
||||
"""create the wrapped read_* or write_* methods"""
|
||||
|
||||
self.method_names.discard(self.func.__qualname__)
|
||||
for key in self.keys:
|
||||
|
||||
@wraps(self.func)
|
||||
def wrapped(module, pname=key, func=self.func):
|
||||
value = func(module, pname)
|
||||
if value is not Done:
|
||||
setattr(module, pname, value)
|
||||
return value
|
||||
|
||||
wrapped = self.wrap(key)
|
||||
method_name = self.prefix + key
|
||||
wrapped.wrapped = True
|
||||
method = 'read_' + key
|
||||
rfunc = getattr(owner, method, None)
|
||||
if rfunc and not rfunc.wrapped:
|
||||
raise ProgrammingError('superfluous method %s.%s (overwritten by ReadHandler)'
|
||||
% (owner.__name__, method))
|
||||
setattr(owner, method, wrapped)
|
||||
if self.poll is not None:
|
||||
# wrapped.poll is False when the nopoll decorator is applied either to self.func or to self
|
||||
wrapped.poll = getattr(wrapped, 'poll', self.poll)
|
||||
func = getattr(owner, method_name, None)
|
||||
if func and not func.wrapped:
|
||||
raise ProgrammingError('superfluous method %s.%s (overwritten by %s)'
|
||||
% (owner.__name__, method_name, self.__class__.__name__))
|
||||
setattr(owner, method_name, wrapped)
|
||||
|
||||
def wrap(self, key):
|
||||
"""create wrapped method from self.func
|
||||
|
||||
with name self.prefix + key"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class ReadHandler(Handler):
|
||||
"""decorator for read handler methods"""
|
||||
prefix = 'read_'
|
||||
poll = True
|
||||
|
||||
def wrap(self, key):
|
||||
def method(module, pname=key, func=self.func):
|
||||
value = func(module, pname)
|
||||
if value is not Done:
|
||||
setattr(module, pname, value)
|
||||
return value
|
||||
return wraps(self.func)(method)
|
||||
|
||||
|
||||
class CommonReadHandler(ReadHandler):
|
||||
"""decorator for a handler reading several parameters in one go"""
|
||||
def __init__(self, keys):
|
||||
"""initialize the decorator
|
||||
|
||||
:param keys: parameter names (an iterable)
|
||||
"""
|
||||
super().__init__(keys)
|
||||
self.first_key = next(iter(keys))
|
||||
|
||||
def wrap(self, key):
|
||||
def method(module, func=self.func):
|
||||
ret = func(module)
|
||||
if ret not in (None, Done):
|
||||
raise ProgrammingError('a method wrapped with CommonReadHandler must not return any value')
|
||||
|
||||
method = wraps(self.func)(method)
|
||||
method.poll = self.poll if key == self.first_key else False
|
||||
return method
|
||||
|
||||
|
||||
class WriteHandler(Handler):
|
||||
"""decorator for write methods"""
|
||||
"""decorator for write handler methods"""
|
||||
prefix = 'write_'
|
||||
|
||||
def __set_name__(self, owner, name):
|
||||
"""create the wrapped write_* methods"""
|
||||
|
||||
self.method_names.discard(self.func.__qualname__)
|
||||
for key in self.keys:
|
||||
|
||||
@wraps(self.func)
|
||||
def wrapped(module, value, pname=key, func=self.func):
|
||||
value = func(module, pname, value)
|
||||
if value is not Done:
|
||||
setattr(module, pname, value)
|
||||
return value
|
||||
|
||||
wrapped.wrapped = True
|
||||
method = 'write_' + key
|
||||
wfunc = getattr(owner, method, None)
|
||||
if wfunc and not wfunc.wrapped:
|
||||
raise ProgrammingError('superfluous method %s.%s (overwritten by WriteHandler)'
|
||||
% (owner.__name__, method))
|
||||
setattr(owner, method, wrapped)
|
||||
def wrap(self, key):
|
||||
@wraps(self.func)
|
||||
def method(module, value, pname=key, func=self.func):
|
||||
value = func(module, pname, value)
|
||||
if value is not Done:
|
||||
setattr(module, pname, value)
|
||||
return value
|
||||
return method
|
||||
|
||||
|
||||
class WriteParameters(dict):
|
||||
@ -153,33 +177,30 @@ class WriteParameters(dict):
|
||||
return tuple(self[k] for k in keys)
|
||||
|
||||
|
||||
class MultiWriteHandler(Handler):
|
||||
class CommonWriteHandler(WriteHandler):
|
||||
"""decorator for common write handler
|
||||
|
||||
calls the wrapped write method function with values as an argument.
|
||||
- values[pname] returns the to be written value
|
||||
- values['key'] returns a value taken from writeDict
|
||||
- or, if not available return obj.key
|
||||
or, if not available return obj.key
|
||||
- values.as_tuple() returns a tuple with the items in the same order as keys
|
||||
"""
|
||||
|
||||
def __set_name__(self, owner, name):
|
||||
"""create the wrapped write_* methods"""
|
||||
def wrap(self, key):
|
||||
@wraps(self.func)
|
||||
def method(module, value, pname=key, func=self.func):
|
||||
values = WriteParameters(module)
|
||||
values[pname] = value
|
||||
ret = func(module, values)
|
||||
if ret not in (None, Done):
|
||||
raise ProgrammingError('a method wrapped with CommonWriteHandler must not return any value')
|
||||
# remove pname from writeDict. this was not removed in WriteParameters, as it was not missing
|
||||
module.writeDict.pop(pname, None)
|
||||
return method
|
||||
|
||||
self.method_names.discard(self.func.__qualname__)
|
||||
for key in self.keys:
|
||||
|
||||
@wraps(self.func)
|
||||
def wrapped(module, value, pname=key, func=self.func):
|
||||
values = WriteParameters(module)
|
||||
values[pname] = value
|
||||
func(module, values)
|
||||
return Done
|
||||
|
||||
wrapped.wrapped = True
|
||||
method = 'write_' + key
|
||||
wfunc = getattr(owner, method, None)
|
||||
if wfunc and not wfunc.wrapped:
|
||||
raise ProgrammingError('superfluous method %s.%s (overwritten by WriteHandler)'
|
||||
% (owner.__name__, method))
|
||||
setattr(owner, method, wrapped)
|
||||
def nopoll(func):
|
||||
"""decorator to indicate that a read method is not to be polled"""
|
||||
func.poll = False
|
||||
return func
|
||||
|
@ -44,7 +44,7 @@ from secop.modules import Communicator, Done, \
|
||||
Drivable, Parameter, Property, Readable
|
||||
from secop.poller import Poller
|
||||
from secop.io import HasIO
|
||||
from secop.rwhandler import ReadHandler, MultiWriteHandler
|
||||
from secop.rwhandler import CommonReadHandler, CommonWriteHandler
|
||||
|
||||
try:
|
||||
import secop_psi.ppmswindows as ppmshw
|
||||
@ -146,6 +146,12 @@ class PpmsBase(HasIO, Readable):
|
||||
self.value = value
|
||||
self.status = (self.Status.IDLE, '')
|
||||
|
||||
def comm_write(self, command):
|
||||
"""write command and check if result is OK"""
|
||||
reply = self.communicate(command)
|
||||
if reply != 'OK':
|
||||
raise HardwareError('bad reply %r to command %r' % (reply, command))
|
||||
|
||||
|
||||
class Channel(PpmsBase):
|
||||
"""channel base class"""
|
||||
@ -190,23 +196,22 @@ class DriverChannel(Channel):
|
||||
|
||||
param_names = 'current', 'powerlimit'
|
||||
|
||||
@ReadHandler(param_names)
|
||||
def read_params(self, pname=None):
|
||||
@CommonReadHandler(param_names)
|
||||
def read_params(self):
|
||||
no, self.current, self.powerlimit = literal_eval(
|
||||
self.communicate('DRVOUT? %d' % self.no))
|
||||
if self.no != no:
|
||||
raise HardwareError('DRVOUT command: channel number in reply does not match')
|
||||
return Done
|
||||
|
||||
@MultiWriteHandler(param_names)
|
||||
@CommonWriteHandler(param_names)
|
||||
def write_params(self, values):
|
||||
"""write parameters
|
||||
|
||||
:param values: a dict like object containing the parameters to be written
|
||||
"""
|
||||
self.read_params() # make sure parameters are up to date
|
||||
self.set_params('DRVOUT %(no)d,%(current)g,%(powerlimit)g' % values)
|
||||
return self.read_params(None) # read back
|
||||
self.comm_write('DRVOUT %(no)d,%(current)g,%(powerlimit)g' % values)
|
||||
self.read_params() # read back
|
||||
|
||||
|
||||
class BridgeChannel(Channel):
|
||||
@ -225,8 +230,8 @@ class BridgeChannel(Channel):
|
||||
|
||||
param_names = 'enabled', 'enabled', 'powerlimit', 'dcflag', 'readingmode', 'voltagelimit'
|
||||
|
||||
@ReadHandler(param_names)
|
||||
def read_params(self, pname=None):
|
||||
@CommonReadHandler(param_names)
|
||||
def read_params(self):
|
||||
no, excitation, powerlimit, self.dcflag, self.readingmode, voltagelimit = literal_eval(
|
||||
self.communicate('BRIDGE? %d' % self.no))
|
||||
if self.no != no:
|
||||
@ -238,9 +243,8 @@ class BridgeChannel(Channel):
|
||||
self.powerlimit = powerlimit
|
||||
if voltagelimit:
|
||||
self.voltagelimit = voltagelimit
|
||||
return Done
|
||||
|
||||
@MultiWriteHandler(param_names)
|
||||
@CommonWriteHandler(param_names)
|
||||
def write_params(self, values):
|
||||
"""write parameters
|
||||
|
||||
@ -251,9 +255,9 @@ class BridgeChannel(Channel):
|
||||
values['excitation'] = 0
|
||||
values['powerlimit'] = 0
|
||||
values['voltagelimit'] = 0
|
||||
assert self.communicate('BRIDGE %(no)d,%(enabled)g,%(powerlimit)g,%(dcflag)d,'
|
||||
'%(readingmode)d,%(voltagelimit)g' % values) == 'OK'
|
||||
return self.read_params() # read back
|
||||
self.comm_write('BRIDGE %(no)d,%(enabled)g,%(powerlimit)g,%(dcflag)d,'
|
||||
'%(readingmode)d,%(voltagelimit)g' % values)
|
||||
self.read_params() # read back
|
||||
|
||||
|
||||
class Level(PpmsBase):
|
||||
@ -371,13 +375,13 @@ class Temp(PpmsBase, Drivable):
|
||||
|
||||
param_names = 'setpoint', 'workingramp', 'approachmode'
|
||||
|
||||
@ReadHandler(param_names)
|
||||
def read_params(self, pname):
|
||||
@CommonReadHandler(param_names)
|
||||
def read_params(self):
|
||||
settings = literal_eval(self.communicate('TEMP?'))
|
||||
if settings == self._last_settings:
|
||||
# update parameters only on change, as 'ramp' and 'approachmode' are
|
||||
# not always sent to the hardware
|
||||
return Done
|
||||
return
|
||||
self.setpoint, self.workingramp, self.approachmode = self._last_settings = settings
|
||||
if self.setpoint != 10 or not self._wait_at10:
|
||||
self.log.debug('read back target %g %r' % (self.setpoint, self._wait_at10))
|
||||
@ -385,7 +389,6 @@ class Temp(PpmsBase, Drivable):
|
||||
if self.workingramp != 2 or not self._ramp_at_limit:
|
||||
self.log.debug('read back ramp %g %r' % (self.workingramp, self._ramp_at_limit))
|
||||
self.ramp = self.workingramp
|
||||
return Done
|
||||
|
||||
def _write_params(self, setpoint, ramp, approachmode):
|
||||
wait_at10 = False
|
||||
@ -403,8 +406,8 @@ class Temp(PpmsBase, Drivable):
|
||||
self.calc_expected(setpoint, ramp)
|
||||
self.log.debug(
|
||||
'change_temp v %r s %r r %r w %r l %r' % (self.value, setpoint, ramp, wait_at10, ramp_at_limit))
|
||||
assert self.communicate('TEMP %g,%g,%d' % (setpoint, ramp, approachmode)) == 'OK'
|
||||
self.read_params('')
|
||||
self.comm_write('TEMP %g,%g,%d' % (setpoint, ramp, approachmode))
|
||||
self.read_params()
|
||||
|
||||
def update_value_status(self, value, packed_status):
|
||||
if value is None:
|
||||
@ -534,23 +537,22 @@ class Field(PpmsBase, Drivable):
|
||||
|
||||
param_names = 'target', 'ramp', 'approachmode', 'persistentmode'
|
||||
|
||||
@ReadHandler(param_names)
|
||||
def read_params(self, pname):
|
||||
@CommonReadHandler(param_names)
|
||||
def read_params(self):
|
||||
settings = literal_eval(self.communicate('FIELD?'))
|
||||
# print('last_settings tt %s' % repr(self._last_settings))
|
||||
if settings == self._last_settings:
|
||||
# we update parameters only on change, as 'ramp' and 'approachmode' are
|
||||
# not always sent to the hardware
|
||||
return Done
|
||||
return
|
||||
target, ramp, self.approachmode, self.persistentmode = self._last_settings = settings
|
||||
self.target = round(target * 1e-4, 7)
|
||||
self.ramp = ramp * 6e-3
|
||||
return Done
|
||||
|
||||
def _write_params(self, target, ramp, approachmode, persistentmode):
|
||||
assert self.communicate('FIELD %g,%g,%d,%d' % (
|
||||
target * 1e+4, ramp / 6e-3, approachmode, persistentmode)) == 'OK'
|
||||
self.read_params('')
|
||||
self.comm_write('FIELD %g,%g,%d,%d' % (
|
||||
target * 1e+4, ramp / 6e-3, approachmode, persistentmode))
|
||||
self.read_params()
|
||||
|
||||
def update_value_status(self, value, packed_status):
|
||||
if value is None:
|
||||
@ -659,21 +661,20 @@ class Position(PpmsBase, Drivable):
|
||||
|
||||
param_names = 'target', 'speed'
|
||||
|
||||
@ReadHandler(param_names)
|
||||
def read_params(self, pname):
|
||||
@CommonReadHandler(param_names)
|
||||
def read_params(self):
|
||||
settings = literal_eval(self.communicate('MOVE?'))
|
||||
if settings == self._last_settings:
|
||||
# we update parameters only on change, as 'speed' is
|
||||
# not always sent to the hardware
|
||||
return Done
|
||||
return
|
||||
self.target, _, speed = self._last_settings = settings
|
||||
self.speed = (15 - speed) * 0.8
|
||||
return Done
|
||||
|
||||
def _write_params(self, target, speed):
|
||||
speed = int(round(min(14, max(0, 15 - speed / 0.8)), 0))
|
||||
assert self.communicate('MOVE %g,%d,%d' % (target, 0, speed)) == 'OK'
|
||||
return self.read_params('')
|
||||
self.comm_write('MOVE %g,%d,%d' % (target, 0, speed))
|
||||
return self.read_params()
|
||||
|
||||
def update_value_status(self, value, packed_status):
|
||||
if not self.enabled:
|
||||
|
203
test/test_handler.py
Normal file
203
test/test_handler.py
Normal file
@ -0,0 +1,203 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# *****************************************************************************
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Markus Zolliker <markus.zolliker@psi.ch>
|
||||
#
|
||||
# *****************************************************************************
|
||||
|
||||
|
||||
from secop.rwhandler import ReadHandler, WriteHandler, \
|
||||
CommonReadHandler, CommonWriteHandler, nopoll
|
||||
from secop.core import Module, Parameter, FloatRange
|
||||
|
||||
|
||||
class DispatcherStub:
|
||||
# the first update from the poller comes a very short time after the
|
||||
# initial value from the timestamp. However, in the test below
|
||||
# the second update happens after the updates dict is cleared
|
||||
# -> we have to inhibit the 'omit unchanged update' feature
|
||||
omit_unchanged_within = 0
|
||||
|
||||
def __init__(self, updates):
|
||||
self.updates = updates
|
||||
|
||||
def announce_update(self, modulename, pname, pobj):
|
||||
self.updates.setdefault(modulename, {})
|
||||
if pobj.readerror:
|
||||
self.updates[modulename]['error', pname] = str(pobj.readerror)
|
||||
else:
|
||||
self.updates[modulename][pname] = pobj.value
|
||||
|
||||
|
||||
class LoggerStub:
|
||||
def debug(self, fmt, *args):
|
||||
print(fmt % args)
|
||||
info = warning = exception = error = debug
|
||||
handlers = []
|
||||
|
||||
|
||||
logger = LoggerStub()
|
||||
|
||||
|
||||
class ServerStub:
|
||||
def __init__(self, updates):
|
||||
self.dispatcher = DispatcherStub(updates)
|
||||
|
||||
|
||||
class ModuleTest(Module):
|
||||
def __init__(self, updates=None, **opts):
|
||||
opts['description'] = ''
|
||||
super().__init__('mod', logger, opts, ServerStub(updates or {}))
|
||||
|
||||
|
||||
def test_handler():
|
||||
data = []
|
||||
|
||||
class Mod(ModuleTest):
|
||||
a = Parameter('', FloatRange(), readonly=False, poll=True)
|
||||
b = Parameter('', FloatRange(), readonly=False, poll=True)
|
||||
|
||||
@ReadHandler(['a', 'b'])
|
||||
def read_hdl(self, pname):
|
||||
value = data.pop()
|
||||
data.append(pname)
|
||||
return value
|
||||
|
||||
@WriteHandler(['a', 'b'])
|
||||
def write_hdl(self, pname, value):
|
||||
data.append(pname)
|
||||
return value
|
||||
|
||||
assert Mod.read_a.poll is True
|
||||
assert Mod.read_b.poll is True
|
||||
|
||||
m = Mod()
|
||||
|
||||
data.append(1.2)
|
||||
assert m.read_a() == 1.2
|
||||
assert data.pop() == 'a'
|
||||
|
||||
data.append(1.3)
|
||||
assert m.read_b() == 1.3
|
||||
assert data.pop() == 'b'
|
||||
|
||||
assert m.write_a(1.5) == 1.5
|
||||
assert m.a == 1.5
|
||||
assert data.pop() == 'a'
|
||||
|
||||
assert m.write_b(7) == 7
|
||||
assert m.b == 7
|
||||
assert data.pop() == 'b'
|
||||
|
||||
assert data == []
|
||||
|
||||
|
||||
def test_common_handler():
|
||||
data = []
|
||||
|
||||
class Mod(ModuleTest):
|
||||
a = Parameter('', FloatRange(), readonly=False, poll=True)
|
||||
b = Parameter('', FloatRange(), readonly=False, poll=True)
|
||||
|
||||
@CommonReadHandler(['a', 'b'])
|
||||
def read_hdl(self):
|
||||
self.a, self.b = data.pop()
|
||||
data.append('read_hdl')
|
||||
|
||||
@CommonWriteHandler(['a', 'b'])
|
||||
def write_hdl(self, values):
|
||||
self.a = values['a']
|
||||
self.b = values['b']
|
||||
data.append('write_hdl')
|
||||
|
||||
assert set([Mod.read_a.poll, Mod.read_b.poll]) == {True, False}
|
||||
|
||||
m = Mod(a=1, b=2)
|
||||
assert m.writeDict == {'a': 1, 'b': 2}
|
||||
m.write_a(3)
|
||||
assert m.a == 3
|
||||
assert m.b == 2
|
||||
assert data.pop() == 'write_hdl'
|
||||
assert m.writeDict == {}
|
||||
|
||||
m.write_b(4)
|
||||
assert m.a == 3
|
||||
assert m.b == 4
|
||||
assert data.pop() == 'write_hdl'
|
||||
|
||||
data.append((3, 4))
|
||||
m.read_a()
|
||||
assert m.a == 3
|
||||
assert m.b == 4
|
||||
assert data.pop() == 'read_hdl'
|
||||
|
||||
data.append((1.1, 2.2))
|
||||
m.read_b()
|
||||
assert m.a == 1.1
|
||||
assert m.b == 2.2
|
||||
assert data.pop() == 'read_hdl'
|
||||
|
||||
assert data == []
|
||||
|
||||
|
||||
def test_nopoll():
|
||||
class Mod1(ModuleTest):
|
||||
a = Parameter('', FloatRange(), readonly=False, poll=True)
|
||||
b = Parameter('', FloatRange(), readonly=False, poll=True)
|
||||
|
||||
@ReadHandler(['a', 'b'])
|
||||
def read_hdl(self):
|
||||
pass
|
||||
|
||||
assert Mod1.read_a.poll is True
|
||||
assert Mod1.read_b.poll is True
|
||||
|
||||
class Mod2(ModuleTest):
|
||||
a = Parameter('', FloatRange(), readonly=False, poll=True)
|
||||
b = Parameter('', FloatRange(), readonly=False, poll=True)
|
||||
|
||||
@CommonReadHandler(['a', 'b'])
|
||||
def read_hdl(self):
|
||||
pass
|
||||
|
||||
assert Mod2.read_a.poll is True
|
||||
assert Mod2.read_b.poll is False
|
||||
|
||||
class Mod3(ModuleTest):
|
||||
a = Parameter('', FloatRange(), readonly=False, poll=True)
|
||||
b = Parameter('', FloatRange(), readonly=False, poll=True)
|
||||
|
||||
@ReadHandler(['a', 'b'])
|
||||
@nopoll
|
||||
def read_hdl(self):
|
||||
pass
|
||||
|
||||
assert Mod3.read_a.poll is False
|
||||
assert Mod3.read_b.poll is False
|
||||
|
||||
class Mod4(ModuleTest):
|
||||
a = Parameter('', FloatRange(), readonly=False, poll=True)
|
||||
b = Parameter('', FloatRange(), readonly=False, poll=True)
|
||||
|
||||
@nopoll
|
||||
@ReadHandler(['a', 'b'])
|
||||
def read_hdl(self):
|
||||
pass
|
||||
|
||||
assert Mod4.read_a.poll is False
|
||||
assert Mod4.read_b.poll is False
|
Loading…
x
Reference in New Issue
Block a user