improved poller

the poller is now a separate (python-)module
- 3 categories of polled parameters:
  - regular
  - dynamic: accelerated when module is busy
  - slow: less frequent, with lower priority
- several modules might share a poller. this makes sense, when a
  they share a common communicator
- modules using the new poller must define pollerClass
- pollerClass may be define even an other poller implementation
- the old simple poller is still used on Readables without pollerClass
- the poller can not be configured in the config file only, as the
  poll attributes have special values for this poller

Change-Id: Ifd811beeb45733bcbf801608ade877a3a601c24a
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/21066
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
This commit is contained in:
zolliker 2019-08-23 10:12:58 +02:00
parent 6772455dba
commit f21fcfce9d
4 changed files with 502 additions and 6 deletions

View File

@ -283,7 +283,11 @@ class Readable(Module):
def startModule(self, started_callback):
'''start polling thread'''
mkthread(self.__pollThread, started_callback)
if hasattr(self, 'pollerClass'): # an other poller is used
started_callback()
else:
# basic poller kept for reference
mkthread(self.__pollThread, started_callback)
def __pollThread(self, started_callback):
while True:

249
secop/poller.py Normal file
View File

@ -0,0 +1,249 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
'''general, advanced frappy poller
Usage examples:
any Module which want to be polled with a specific Poller must define
the pollerClass class variable:
class MyModule(Readable):
...
pollerClass = poller.Poller
...
modules having a parameter 'iodev' with the same value will share the same poller
'''
import time
from threading import Event
from heapq import heapify, heapreplace
from secop.lib import mkthread, formatException
from secop.errors import ProgrammingError
# poll types:
AUTO = 1 # equivalent to True, converted to REGULAR, SLOW or DYNAMIC
SLOW = 2
REGULAR = 3
DYNAMIC = 4
class PollerBase(object):
startup_timeout = 30 # default timeout for startup
name = 'unknown' # to be overridden in implementors __init__ method
@classmethod
def add_to_table(cls, table, module):
'''sort module into poller table
table is a dict, with (<pollerClass>, <name>) as the key, and the
poller as value.
<name> is module.iodev or module.name, if iodev is not present
'''
try:
pollerClass = module.pollerClass
except AttributeError:
return # no pollerClass -> fall back to simple poller
# for modules with the same iodev, a common poller is used,
# modules without iodev all get their own poller
name = getattr(module, 'iodev', module.name)
poller = table.get((pollerClass, name), None)
if poller is None:
poller = pollerClass(name)
table[(pollerClass, name)] = poller
poller.add_to_poller(module)
def start(self, started_callback):
'''start poller thread
started_callback to be called after all poll items were read at least once
'''
mkthread(self.run, started_callback)
return self.startup_timeout
def run(self, started_callback):
'''poller thread function
started_callback to be called after all poll items were read at least once
'''
raise NotImplementedError
def stop(self):
'''stop polling'''
raise NotImplementedError
def __bool__(self):
'''is there any poll item?'''
raise NotImplementedError
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.name)
__nonzero__ = __bool__ # Py2/3 compat
class Poller(PollerBase):
'''a standard poller
parameters may have the following polltypes:
- REGULAR: by default used for readonly parameters with poll=True
- SLOW: by default used for readonly=False parameters with poll=True.
slow polls happen with lower priority, but at least one parameter
is polled with regular priority within self.module.pollinterval.
Scheduled to poll every slowfactor * module.pollinterval
- DYNAMIC: by default used for 'value' and 'status'
When busy, scheduled to poll every fastfactor * module.pollinterval
'''
DEFAULT_FACTORS = {SLOW: 4, DYNAMIC: 0.25, REGULAR: 1}
def __init__(self, name):
'''create a poller'''
self.queues = {polltype: [] for polltype in self.DEFAULT_FACTORS}
self._stopped = Event()
self.maxwait = 3600
self.name = name
def add_to_poller(self, module):
factors = self.DEFAULT_FACTORS.copy()
try:
factors[DYNAMIC] = module.fast_pollfactor
except AttributeError:
pass
try:
factors[SLOW] = module.slow_pollfactor
except AttributeError:
pass
self.maxwait = min(self.maxwait, getattr(module, 'max_polltestperiod', 10))
try:
self.startup_timeout = max(self.startup_timeout, module.startup_timeout)
except AttributeError:
pass
# at the beginning, queues are simple lists
# later, they will be converted to heaps
for pname, pobj in module.parameters.items():
polltype = int(pobj.poll)
rfunc = getattr(module, 'read_' + pname, None)
if not polltype or not rfunc:
continue
if not hasattr(module, 'pollinterval'):
raise ProgrammingError("module %s must have a pollinterval"
% module.name)
if polltype == AUTO: # covers also pobj.poll == True
if pname == 'value' or pname == 'status':
polltype = DYNAMIC
elif pobj.readonly:
polltype = REGULAR
else:
polltype = SLOW
# placeholders 0 are used for due, lastdue and idx
self.queues[polltype].append((0, 0,
(0, module, pobj, rfunc, factors[polltype])))
def poll_next(self, polltype):
'''try to poll next item
advance in queue until
- an item is found which is really due to poll. return 0 in this case
- or until the next item is not yet due. return next due time in this case
'''
queue = self.queues[polltype]
if not queue:
return float('inf') # queue is empty
now = time.time()
done = False
while not done:
due, lastdue, pollitem = queue[0]
if now < due:
return due
_, module, pobj, rfunc, factor = pollitem
if polltype == DYNAMIC and not module.isBusy():
interval = module.pollinterval # effective interval
mininterval = interval * factor # interval for calculating next due
else:
interval = module.pollinterval * factor
mininterval = interval
due = max(lastdue + interval, pobj.timestamp + interval * 0.5)
if now >= due:
try:
rfunc()
except Exception: # really all. errors are handled within rfunc
# TODO: filter repeated errors and log just statistics
module.log.error(formatException())
done = True
lastdue = due
due = max(lastdue + mininterval, now + min(self.maxwait, mininterval * 0.5))
# replace due, lastdue with new values and sort in
heapreplace(queue, (due, lastdue, pollitem))
return 0
def run(self, started_callback):
'''start poll loop
To be called as a thread. After all parameters are polled once first,
started_callback is called. To be called in Module.start_module.
poll strategy:
Slow polls are performed with lower priority than regular and dynamic polls.
If more polls are scheduled than time permits, at least every second poll is a
dynamic poll. After every n regular polls, one slow poll is done, if due
(where n is the number of regular parameters).
'''
if not self:
# nothing to do (else we might call time.sleep(float('inf')) below
started_callback()
return
# do all polls once and, at the same time, insert due info
for _, queue in sorted(self.queues.items()): # do SLOW polls first
for idx, (_, _, (_, module, pobj, rfunc, factor)) in enumerate(queue):
lastdue = time.time()
try:
rfunc()
except Exception: # really all. errors are handled within rfunc
module.log.error(formatException())
due = lastdue + min(self.maxwait, module.pollinterval * factor)
# in python 3 comparing tuples need some care, as not all objects
# are comparable. Inserting a unique idx solves the problem.
queue[idx] = (due, lastdue, (idx, module, pobj, rfunc, factor))
heapify(queue)
started_callback() # signal end of startup
nregular = len(self.queues[REGULAR])
while not self._stopped.is_set():
due = float('inf')
for _ in range(nregular):
due = min(self.poll_next(DYNAMIC), self.poll_next(REGULAR))
if due:
break # no dynamic or regular polls due
due = min(due, self.poll_next(DYNAMIC), self.poll_next(SLOW))
delay = due - time.time()
if delay > 0:
self._stopped.wait(delay)
def stop(self):
self._stopped.set()
def __bool__(self):
'''is there any poll item?'''
return any(self.queues.values())
__nonzero__ = __bool__ # Py2/3 compat

View File

@ -181,10 +181,15 @@ class Server(object):
raise ConfigError(u'cfgfile %r: needs exactly one node section!' % self._cfgfile)
self.dispatcher = self.nodes.values()[0]
pollTable = dict()
# all objs created, now start them up and interconnect
for modname, modobj in self.modules.items():
self.log.info(u'registering module %r' % modname)
self.dispatcher.register_module(modobj, modname, modobj.properties['export'])
try:
modobj.pollerClass.add_to_table(pollTable, modobj)
except AttributeError:
pass
# also call earlyInit on the modules
modobj.earlyInit()
@ -197,9 +202,14 @@ class Server(object):
event = threading.Event()
# startModule must return either a timeout value or None (default 30 sec)
timeout = modobj.startModule(started_callback=event.set) or 30
start_events.append((time.time() + timeout, modname, event))
self.log.info(u'waiting for modules being started')
for deadline, modname, event in sorted(start_events):
start_events.append((time.time() + timeout, 'module %s' % modname, event))
for poller in pollTable.values():
event = threading.Event()
# poller.start must return either a timeout value or None (default 30 sec)
timeout = poller.start(started_callback=event.set) or 30
start_events.append((time.time() + timeout, repr(poller), event))
self.log.info(u'waiting for modules and pollers being started')
for deadline, name, event in sorted(start_events):
if not event.wait(timeout=max(0, deadline - time.time())):
self.log.info('WARNING: timeout when starting module %s' % modname)
self.log.info(u'all modules started')
self.log.info('WARNING: timeout when starting %s' % name)
self.log.info(u'all modules and pollers started')

233
test/test_poller.py Normal file
View File

@ -0,0 +1,233 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""test poller."""
import time
from collections import OrderedDict
import pytest
from secop.modules import Drivable
from secop.poller import Poller, REGULAR, DYNAMIC, SLOW
Status = Drivable.Status
class Time(object):
STARTTIME = 1000 # artificial time zero
def __init__(self):
self.reset()
self.finish = float('inf')
self.stop = lambda : None
self.commtime = 0.05 # time needed for 1 poll
def reset(self, lifetime=10):
self.seconds = self.STARTTIME
self.idletime = 0.0
self.busytime = 0.0
self.finish = self.STARTTIME + lifetime
def time(self):
if self.seconds > self.finish:
self.finish = float('inf')
self.stop()
return self.seconds
def sleep(self, seconds):
assert 0 <= seconds <= 24*3600
self.idletime += seconds
self.seconds += seconds
def busy(self, seconds):
assert seconds >= 0
self.seconds += seconds
self.busytime += seconds
artime = Time() # artificial test time
@pytest.fixture(autouse=True)
def patch_time(monkeypatch):
monkeypatch.setattr(time, 'time', artime.time)
class Event(object):
def __init__(self):
self.flag = False
def wait(self, timeout):
artime.sleep(max(0,timeout))
def set(self):
self.flag=True
def is_set(self):
return self.flag
class Parameter(object):
def __init__(self, name, readonly, poll, polltype, interval):
self.poll = poll
self.polltype = polltype # used for check only
self.export = name
self.readonly = readonly
self.interval = interval
self.timestamp = 0
self.reset()
def reset(self):
self.cnt = 0
self.span = 0
self.maxspan = 0
def rfunc(self):
artime.busy(artime.commtime)
now = artime.time()
self.span = now - self.timestamp
self.maxspan = max(self.maxspan, self.span)
self.timestamp = now
self.cnt += 1
return True
def __repr__(self):
return 'Parameter(%s)' % ", ".join("%s=%r" % item for item in self.__dict__.items())
class Module(object):
properties = {}
pollerClass = Poller
iodev = 'common_iodev'
def __init__(self, name, pollinterval=5, fastfactor=0.25, slowfactor=4, busy=False,
counts=(), auto=None):
'''create a dummy module
nauto, ndynamic, nregular, nslow are the number of parameters of each polltype
'''
self.pollinterval = pollinterval
self.fast_pollfactor = fastfactor
self.slow_pollfactor = slowfactor
self.parameters = OrderedDict()
self.name = name
self.is_busy = busy
if auto is not None:
self.pvalue = self.addPar('value', True, auto or DYNAMIC, DYNAMIC)
# readonly = False should not matter:
self.pstatus = self.addPar('status', False, auto or DYNAMIC, DYNAMIC)
self.pregular = self.addPar('regular', True, auto or REGULAR, REGULAR)
self.pslow = self.addPar('slow', False, auto or SLOW, SLOW)
self.addPar('notpolled', True, False, 0)
self.counts = 'auto'
else:
ndynamic, nregular, nslow = counts
for i in range(ndynamic):
self.addPar('%s:d%d' % (name, i), True, DYNAMIC, DYNAMIC)
for i in range(nregular):
self.addPar('%s:r%d' % (name, i), True, REGULAR, REGULAR)
for i in range(nslow):
self.addPar('%s:s%d' % (name, i), False, SLOW, SLOW)
self.counts = counts
def addPar(self, name, readonly, poll, expected_polltype):
# self.count[polltype] += 1
expected_interval = self.pollinterval
if expected_polltype == SLOW:
expected_interval *= self.slow_pollfactor
elif expected_polltype == DYNAMIC and self.is_busy:
expected_interval *= self.fast_pollfactor
pobj = Parameter(name, readonly, poll, expected_polltype, expected_interval)
setattr(self, 'read_' + pobj.export, pobj.rfunc)
self.parameters[pobj.export] = pobj
return pobj
def isBusy(self):
return self.is_busy
def __repr__(self):
rdict = self.__dict__.copy()
rdict.pop('parameters')
return 'Module(%r, counts=%r, f=%r, pollinterval=%g, is_busy=%r)' % (self.name,
self.counts, (self.fast_pollfactor, self.slow_pollfactor, 1),
self.pollinterval, self.is_busy)
module_list = [
[Module('x', 3.0, 0.125, 10, False, auto=True),
Module('y', 3.0, 0.125, 10, False, auto=False)],
[Module('a', 1.0, 0.25, 4, True, (5, 5, 10)),
Module('b', 2.0, 0.25, 4, True, (5, 5, 50))],
[Module('c', 1.0, 0.25, 4, False, (5, 0, 0))],
[Module('d', 1.0, 0.25, 4, True, (0, 9, 0))],
[Module('e', 1.0, 0.25, 4, True, (0, 0, 9))],
[Module('f', 1.0, 0.25, 4, True, (0, 0, 0))],
]
@pytest.mark.parametrize('modules', module_list)
def test_Poller(modules):
# check for proper timing
for overloaded in False, True:
artime.reset()
count = {DYNAMIC: 0, REGULAR: 0, SLOW: 0}
maxspan = {DYNAMIC: 0, REGULAR: 0, SLOW: 0}
pollTable = dict()
for module in modules:
Poller.add_to_table(pollTable, module)
for pobj in module.parameters.values():
if pobj.poll:
maxspan[pobj.polltype] = max(maxspan[pobj.polltype], pobj.interval)
count[pobj.polltype] += 1
pobj.reset()
assert len(pollTable) == 1
poller = pollTable[(Poller, 'common_iodev')]
artime.stop = poller.stop
poller._stopped = Event() # patch Event.wait
assert (sum(count.values()) > 0) == bool(poller)
def started_callback(modules=modules):
for module in modules:
for pobj in module.parameters.values():
assert pobj.cnt == bool(pobj.poll) # all parameters have to be polled once
pobj.reset() # set maxspan and cnt to 0
if overloaded:
# overloaded scenario
artime.commtime = 1.0
ncycles = 10
if count[SLOW] > 0:
cycletime = (count[REGULAR] + 1) * count[SLOW] * 2
else:
cycletime = max(count[REGULAR], count[DYNAMIC]) * 2
artime.reset(cycletime * ncycles * 1.01) # poller will quit given time
poller.run(started_callback)
total = artime.time() - artime.STARTTIME
for module in modules:
for pobj in module.parameters.values():
if pobj.poll:
# average_span = total / (pobj.cnt + 1)
assert total / (pobj.cnt + 1) <= max(cycletime, pobj.interval * 1.1)
else:
# normal scenario
artime.commtime = 0.001
artime.reset(max(maxspan.values()) * 5) # poller will quit given time
poller.run(started_callback)
total = artime.time() - artime.STARTTIME
for module in modules:
for pobj in module.parameters.values():
if pobj.poll:
assert pobj.maxspan <= maxspan[pobj.polltype] * 1.1
assert (pobj.cnt + 1) * pobj.interval >= total * 0.99
assert abs(pobj.span - pobj.interval) < 0.01
pobj.reset()