frappy/test/test_poller.py
Markus Zolliker 44eeea1159 improve error handling
in the current implementation, in case polling results in an error,
an error_update is generated. But on activate, for this parameter
a normal update will be generated, indicating that this value is ok.

With this change, parameters have an additonal attribue 'readerror',
which save the error generated on the last read, or None, if the
read was successful.

In addition, subsequent repeated errors are only reported the first
time.

Change-Id: I39f18d17debadd8aa5b904e59998d2172f3f11b9
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/21974
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
2019-12-06 15:41:30 +01:00

238 lines
8.4 KiB
Python

# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""test poller."""
import time
from collections import OrderedDict
import pytest
from secop.modules import Drivable
from secop.poller import Poller, REGULAR, DYNAMIC, SLOW
Status = Drivable.Status
class Time:
STARTTIME = 1000 # artificial time zero
def __init__(self):
self.reset()
self.finish = float('inf')
self.stop = lambda : None
self.commtime = 0.05 # time needed for 1 poll
def reset(self, lifetime=10):
self.seconds = self.STARTTIME
self.idletime = 0.0
self.busytime = 0.0
self.finish = self.STARTTIME + lifetime
def time(self):
if self.seconds > self.finish:
self.finish = float('inf')
self.stop()
return self.seconds
def sleep(self, seconds):
assert 0 <= seconds <= 24*3600
self.idletime += seconds
self.seconds += seconds
def busy(self, seconds):
assert seconds >= 0
self.seconds += seconds
self.busytime += seconds
artime = Time() # artificial test time
@pytest.fixture(autouse=True)
def patch_time(monkeypatch):
monkeypatch.setattr(time, 'time', artime.time)
class Event:
def __init__(self):
self.flag = False
def wait(self, timeout):
artime.sleep(max(0,timeout))
def set(self):
self.flag=True
def is_set(self):
return self.flag
class Parameter:
def __init__(self, name, readonly, poll, polltype, interval):
self.poll = poll
self.polltype = polltype # used for check only
self.export = name
self.readonly = readonly
self.interval = interval
self.timestamp = 0
self.handler = None
self.reset()
def reset(self):
self.cnt = 0
self.span = 0
self.maxspan = 0
def rfunc(self):
artime.busy(artime.commtime)
now = artime.time()
self.span = now - self.timestamp
self.maxspan = max(self.maxspan, self.span)
self.timestamp = now
self.cnt += 1
return True
def __repr__(self):
return 'Parameter(%s)' % ", ".join("%s=%r" % item for item in self.__dict__.items())
class Module:
properties = {}
pollerClass = Poller
iodev = 'common_iodev'
def __init__(self, name, pollinterval=5, fastfactor=0.25, slowfactor=4, busy=False,
counts=(), auto=None):
'''create a dummy module
nauto, ndynamic, nregular, nslow are the number of parameters of each polltype
'''
self.pollinterval = pollinterval
self.fast_pollfactor = fastfactor
self.slow_pollfactor = slowfactor
self.parameters = OrderedDict()
self.name = name
self.is_busy = busy
if auto is not None:
self.pvalue = self.addPar('value', True, auto or DYNAMIC, DYNAMIC)
# readonly = False should not matter:
self.pstatus = self.addPar('status', False, auto or DYNAMIC, DYNAMIC)
self.pregular = self.addPar('regular', True, auto or REGULAR, REGULAR)
self.pslow = self.addPar('slow', False, auto or SLOW, SLOW)
self.addPar('notpolled', True, False, 0)
self.counts = 'auto'
else:
ndynamic, nregular, nslow = counts
for i in range(ndynamic):
self.addPar('%s:d%d' % (name, i), True, DYNAMIC, DYNAMIC)
for i in range(nregular):
self.addPar('%s:r%d' % (name, i), True, REGULAR, REGULAR)
for i in range(nslow):
self.addPar('%s:s%d' % (name, i), False, SLOW, SLOW)
self.counts = counts
def addPar(self, name, readonly, poll, expected_polltype):
# self.count[polltype] += 1
expected_interval = self.pollinterval
if expected_polltype == SLOW:
expected_interval *= self.slow_pollfactor
elif expected_polltype == DYNAMIC and self.is_busy:
expected_interval *= self.fast_pollfactor
pobj = Parameter(name, readonly, poll, expected_polltype, expected_interval)
setattr(self, 'read_' + pobj.export, pobj.rfunc)
self.parameters[pobj.export] = pobj
return pobj
def isBusy(self):
return self.is_busy
def pollOne(self, pname):
getattr(self, 'read_' + pname)()
def __repr__(self):
rdict = self.__dict__.copy()
rdict.pop('parameters')
return 'Module(%r, counts=%r, f=%r, pollinterval=%g, is_busy=%r)' % (self.name,
self.counts, (self.fast_pollfactor, self.slow_pollfactor, 1),
self.pollinterval, self.is_busy)
module_list = [
[Module('x', 3.0, 0.125, 10, False, auto=True),
Module('y', 3.0, 0.125, 10, False, auto=False)],
[Module('a', 1.0, 0.25, 4, True, (5, 5, 10)),
Module('b', 2.0, 0.25, 4, True, (5, 5, 50))],
[Module('c', 1.0, 0.25, 4, False, (5, 0, 0))],
[Module('d', 1.0, 0.25, 4, True, (0, 9, 0))],
[Module('e', 1.0, 0.25, 4, True, (0, 0, 9))],
[Module('f', 1.0, 0.25, 4, True, (0, 0, 0))],
]
@pytest.mark.parametrize('modules', module_list)
def test_Poller(modules):
# check for proper timing
for overloaded in False, True:
artime.reset()
count = {DYNAMIC: 0, REGULAR: 0, SLOW: 0}
maxspan = {DYNAMIC: 0, REGULAR: 0, SLOW: 0}
pollTable = dict()
for module in modules:
Poller.add_to_table(pollTable, module)
for pobj in module.parameters.values():
if pobj.poll:
maxspan[pobj.polltype] = max(maxspan[pobj.polltype], pobj.interval)
count[pobj.polltype] += 1
pobj.reset()
assert len(pollTable) == 1
poller = pollTable[(Poller, 'common_iodev')]
artime.stop = poller.stop
poller._stopped = Event() # patch Event.wait
assert (sum(count.values()) > 0) == bool(poller)
def started_callback(modules=modules):
for module in modules:
for pobj in module.parameters.values():
assert pobj.cnt == bool(pobj.poll) # all parameters have to be polled once
pobj.reset() # set maxspan and cnt to 0
if overloaded:
# overloaded scenario
artime.commtime = 1.0
ncycles = 10
if count[SLOW] > 0:
cycletime = (count[REGULAR] + 1) * count[SLOW] * 2
else:
cycletime = max(count[REGULAR], count[DYNAMIC]) * 2
artime.reset(cycletime * ncycles * 1.01) # poller will quit given time
poller.run(started_callback)
total = artime.time() - artime.STARTTIME
for module in modules:
for pobj in module.parameters.values():
if pobj.poll:
# average_span = total / (pobj.cnt + 1)
assert total / (pobj.cnt + 1) <= max(cycletime, pobj.interval * 1.1)
else:
# normal scenario
artime.commtime = 0.001
artime.reset(max(maxspan.values()) * 5) # poller will quit given time
poller.run(started_callback)
total = artime.time() - artime.STARTTIME
for module in modules:
for pobj in module.parameters.values():
if pobj.poll:
assert pobj.maxspan <= maxspan[pobj.polltype] * 1.1
assert (pobj.cnt + 1) * pobj.interval >= total * 0.99
assert abs(pobj.span - pobj.interval) < 0.01
pobj.reset()