From f21fcfce9dd657c028c7c52142c711d17a169c4b Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Fri, 23 Aug 2019 10:12:58 +0200 Subject: [PATCH] improved poller the poller is now a separate (python-)module - 3 categories of polled parameters: - regular - dynamic: accelerated when module is busy - slow: less frequent, with lower priority - several modules might share a poller. this makes sense, when a they share a common communicator - modules using the new poller must define pollerClass - pollerClass may be define even an other poller implementation - the old simple poller is still used on Readables without pollerClass - the poller can not be configured in the config file only, as the poll attributes have special values for this poller Change-Id: Ifd811beeb45733bcbf801608ade877a3a601c24a Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/21066 Reviewed-by: Markus Zolliker Tested-by: JenkinsCodeReview --- secop/modules.py | 6 +- secop/poller.py | 249 ++++++++++++++++++++++++++++++++++++++++++++ secop/server.py | 20 +++- test/test_poller.py | 233 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 502 insertions(+), 6 deletions(-) create mode 100644 secop/poller.py create mode 100644 test/test_poller.py diff --git a/secop/modules.py b/secop/modules.py index bdc82f0..fd41b2d 100644 --- a/secop/modules.py +++ b/secop/modules.py @@ -283,7 +283,11 @@ class Readable(Module): def startModule(self, started_callback): '''start polling thread''' - mkthread(self.__pollThread, started_callback) + if hasattr(self, 'pollerClass'): # an other poller is used + started_callback() + else: + # basic poller kept for reference + mkthread(self.__pollThread, started_callback) def __pollThread(self, started_callback): while True: diff --git a/secop/poller.py b/secop/poller.py new file mode 100644 index 0000000..5c2bfa6 --- /dev/null +++ b/secop/poller.py @@ -0,0 +1,249 @@ +# -*- coding: utf-8 -*- +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** +'''general, advanced frappy poller + +Usage examples: + any Module which want to be polled with a specific Poller must define + the pollerClass class variable: + + class MyModule(Readable): + ... + pollerClass = poller.Poller + ... + + modules having a parameter 'iodev' with the same value will share the same poller +''' + +import time +from threading import Event +from heapq import heapify, heapreplace +from secop.lib import mkthread, formatException +from secop.errors import ProgrammingError + +# poll types: +AUTO = 1 # equivalent to True, converted to REGULAR, SLOW or DYNAMIC +SLOW = 2 +REGULAR = 3 +DYNAMIC = 4 + +class PollerBase(object): + + startup_timeout = 30 # default timeout for startup + name = 'unknown' # to be overridden in implementors __init__ method + + @classmethod + def add_to_table(cls, table, module): + '''sort module into poller table + + table is a dict, with (, ) as the key, and the + poller as value. + is module.iodev or module.name, if iodev is not present + ''' + try: + pollerClass = module.pollerClass + except AttributeError: + return # no pollerClass -> fall back to simple poller + # for modules with the same iodev, a common poller is used, + # modules without iodev all get their own poller + name = getattr(module, 'iodev', module.name) + poller = table.get((pollerClass, name), None) + if poller is None: + poller = pollerClass(name) + table[(pollerClass, name)] = poller + poller.add_to_poller(module) + + def start(self, started_callback): + '''start poller thread + + started_callback to be called after all poll items were read at least once + ''' + mkthread(self.run, started_callback) + return self.startup_timeout + + def run(self, started_callback): + '''poller thread function + + started_callback to be called after all poll items were read at least once + ''' + raise NotImplementedError + + def stop(self): + '''stop polling''' + raise NotImplementedError + + def __bool__(self): + '''is there any poll item?''' + raise NotImplementedError + + def __repr__(self): + return '%s(%r)' % (self.__class__.__name__, self.name) + + __nonzero__ = __bool__ # Py2/3 compat + + +class Poller(PollerBase): + '''a standard poller + + parameters may have the following polltypes: + + - REGULAR: by default used for readonly parameters with poll=True + - SLOW: by default used for readonly=False parameters with poll=True. + slow polls happen with lower priority, but at least one parameter + is polled with regular priority within self.module.pollinterval. + Scheduled to poll every slowfactor * module.pollinterval + - DYNAMIC: by default used for 'value' and 'status' + When busy, scheduled to poll every fastfactor * module.pollinterval + ''' + + DEFAULT_FACTORS = {SLOW: 4, DYNAMIC: 0.25, REGULAR: 1} + + def __init__(self, name): + '''create a poller''' + self.queues = {polltype: [] for polltype in self.DEFAULT_FACTORS} + self._stopped = Event() + self.maxwait = 3600 + self.name = name + + def add_to_poller(self, module): + factors = self.DEFAULT_FACTORS.copy() + try: + factors[DYNAMIC] = module.fast_pollfactor + except AttributeError: + pass + try: + factors[SLOW] = module.slow_pollfactor + except AttributeError: + pass + self.maxwait = min(self.maxwait, getattr(module, 'max_polltestperiod', 10)) + try: + self.startup_timeout = max(self.startup_timeout, module.startup_timeout) + except AttributeError: + pass + # at the beginning, queues are simple lists + # later, they will be converted to heaps + for pname, pobj in module.parameters.items(): + polltype = int(pobj.poll) + rfunc = getattr(module, 'read_' + pname, None) + if not polltype or not rfunc: + continue + if not hasattr(module, 'pollinterval'): + raise ProgrammingError("module %s must have a pollinterval" + % module.name) + if polltype == AUTO: # covers also pobj.poll == True + if pname == 'value' or pname == 'status': + polltype = DYNAMIC + elif pobj.readonly: + polltype = REGULAR + else: + polltype = SLOW + # placeholders 0 are used for due, lastdue and idx + self.queues[polltype].append((0, 0, + (0, module, pobj, rfunc, factors[polltype]))) + + def poll_next(self, polltype): + '''try to poll next item + + advance in queue until + - an item is found which is really due to poll. return 0 in this case + - or until the next item is not yet due. return next due time in this case + ''' + queue = self.queues[polltype] + if not queue: + return float('inf') # queue is empty + now = time.time() + done = False + while not done: + due, lastdue, pollitem = queue[0] + if now < due: + return due + _, module, pobj, rfunc, factor = pollitem + + if polltype == DYNAMIC and not module.isBusy(): + interval = module.pollinterval # effective interval + mininterval = interval * factor # interval for calculating next due + else: + interval = module.pollinterval * factor + mininterval = interval + due = max(lastdue + interval, pobj.timestamp + interval * 0.5) + if now >= due: + try: + rfunc() + except Exception: # really all. errors are handled within rfunc + # TODO: filter repeated errors and log just statistics + module.log.error(formatException()) + done = True + lastdue = due + due = max(lastdue + mininterval, now + min(self.maxwait, mininterval * 0.5)) + # replace due, lastdue with new values and sort in + heapreplace(queue, (due, lastdue, pollitem)) + return 0 + + def run(self, started_callback): + '''start poll loop + + To be called as a thread. After all parameters are polled once first, + started_callback is called. To be called in Module.start_module. + + poll strategy: + Slow polls are performed with lower priority than regular and dynamic polls. + If more polls are scheduled than time permits, at least every second poll is a + dynamic poll. After every n regular polls, one slow poll is done, if due + (where n is the number of regular parameters). + ''' + if not self: + # nothing to do (else we might call time.sleep(float('inf')) below + started_callback() + return + # do all polls once and, at the same time, insert due info + for _, queue in sorted(self.queues.items()): # do SLOW polls first + for idx, (_, _, (_, module, pobj, rfunc, factor)) in enumerate(queue): + lastdue = time.time() + try: + rfunc() + except Exception: # really all. errors are handled within rfunc + module.log.error(formatException()) + due = lastdue + min(self.maxwait, module.pollinterval * factor) + # in python 3 comparing tuples need some care, as not all objects + # are comparable. Inserting a unique idx solves the problem. + queue[idx] = (due, lastdue, (idx, module, pobj, rfunc, factor)) + heapify(queue) + started_callback() # signal end of startup + nregular = len(self.queues[REGULAR]) + while not self._stopped.is_set(): + due = float('inf') + for _ in range(nregular): + due = min(self.poll_next(DYNAMIC), self.poll_next(REGULAR)) + if due: + break # no dynamic or regular polls due + due = min(due, self.poll_next(DYNAMIC), self.poll_next(SLOW)) + delay = due - time.time() + if delay > 0: + self._stopped.wait(delay) + + def stop(self): + self._stopped.set() + + def __bool__(self): + '''is there any poll item?''' + return any(self.queues.values()) + + __nonzero__ = __bool__ # Py2/3 compat diff --git a/secop/server.py b/secop/server.py index 16c9d14..2a4685a 100644 --- a/secop/server.py +++ b/secop/server.py @@ -181,10 +181,15 @@ class Server(object): raise ConfigError(u'cfgfile %r: needs exactly one node section!' % self._cfgfile) self.dispatcher = self.nodes.values()[0] + pollTable = dict() # all objs created, now start them up and interconnect for modname, modobj in self.modules.items(): self.log.info(u'registering module %r' % modname) self.dispatcher.register_module(modobj, modname, modobj.properties['export']) + try: + modobj.pollerClass.add_to_table(pollTable, modobj) + except AttributeError: + pass # also call earlyInit on the modules modobj.earlyInit() @@ -197,9 +202,14 @@ class Server(object): event = threading.Event() # startModule must return either a timeout value or None (default 30 sec) timeout = modobj.startModule(started_callback=event.set) or 30 - start_events.append((time.time() + timeout, modname, event)) - self.log.info(u'waiting for modules being started') - for deadline, modname, event in sorted(start_events): + start_events.append((time.time() + timeout, 'module %s' % modname, event)) + for poller in pollTable.values(): + event = threading.Event() + # poller.start must return either a timeout value or None (default 30 sec) + timeout = poller.start(started_callback=event.set) or 30 + start_events.append((time.time() + timeout, repr(poller), event)) + self.log.info(u'waiting for modules and pollers being started') + for deadline, name, event in sorted(start_events): if not event.wait(timeout=max(0, deadline - time.time())): - self.log.info('WARNING: timeout when starting module %s' % modname) - self.log.info(u'all modules started') + self.log.info('WARNING: timeout when starting %s' % name) + self.log.info(u'all modules and pollers started') diff --git a/test/test_poller.py b/test/test_poller.py new file mode 100644 index 0000000..1f3ba95 --- /dev/null +++ b/test/test_poller.py @@ -0,0 +1,233 @@ +# -*- coding: utf-8 -*- +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** +"""test poller.""" + +import time +from collections import OrderedDict +import pytest +from secop.modules import Drivable +from secop.poller import Poller, REGULAR, DYNAMIC, SLOW +Status = Drivable.Status + +class Time(object): + STARTTIME = 1000 # artificial time zero + def __init__(self): + self.reset() + self.finish = float('inf') + self.stop = lambda : None + self.commtime = 0.05 # time needed for 1 poll + + def reset(self, lifetime=10): + self.seconds = self.STARTTIME + self.idletime = 0.0 + self.busytime = 0.0 + self.finish = self.STARTTIME + lifetime + + def time(self): + if self.seconds > self.finish: + self.finish = float('inf') + self.stop() + return self.seconds + + def sleep(self, seconds): + assert 0 <= seconds <= 24*3600 + self.idletime += seconds + self.seconds += seconds + + def busy(self, seconds): + assert seconds >= 0 + self.seconds += seconds + self.busytime += seconds + +artime = Time() # artificial test time + +@pytest.fixture(autouse=True) +def patch_time(monkeypatch): + monkeypatch.setattr(time, 'time', artime.time) + + +class Event(object): + def __init__(self): + self.flag = False + + def wait(self, timeout): + artime.sleep(max(0,timeout)) + + def set(self): + self.flag=True + + def is_set(self): + return self.flag + + +class Parameter(object): + def __init__(self, name, readonly, poll, polltype, interval): + self.poll = poll + self.polltype = polltype # used for check only + self.export = name + self.readonly = readonly + self.interval = interval + self.timestamp = 0 + self.reset() + + def reset(self): + self.cnt = 0 + self.span = 0 + self.maxspan = 0 + + def rfunc(self): + artime.busy(artime.commtime) + now = artime.time() + self.span = now - self.timestamp + self.maxspan = max(self.maxspan, self.span) + self.timestamp = now + self.cnt += 1 + return True + + def __repr__(self): + return 'Parameter(%s)' % ", ".join("%s=%r" % item for item in self.__dict__.items()) + + +class Module(object): + properties = {} + pollerClass = Poller + iodev = 'common_iodev' + def __init__(self, name, pollinterval=5, fastfactor=0.25, slowfactor=4, busy=False, + counts=(), auto=None): + '''create a dummy module + + nauto, ndynamic, nregular, nslow are the number of parameters of each polltype + ''' + self.pollinterval = pollinterval + self.fast_pollfactor = fastfactor + self.slow_pollfactor = slowfactor + self.parameters = OrderedDict() + self.name = name + self.is_busy = busy + if auto is not None: + self.pvalue = self.addPar('value', True, auto or DYNAMIC, DYNAMIC) + # readonly = False should not matter: + self.pstatus = self.addPar('status', False, auto or DYNAMIC, DYNAMIC) + self.pregular = self.addPar('regular', True, auto or REGULAR, REGULAR) + self.pslow = self.addPar('slow', False, auto or SLOW, SLOW) + self.addPar('notpolled', True, False, 0) + self.counts = 'auto' + else: + ndynamic, nregular, nslow = counts + for i in range(ndynamic): + self.addPar('%s:d%d' % (name, i), True, DYNAMIC, DYNAMIC) + for i in range(nregular): + self.addPar('%s:r%d' % (name, i), True, REGULAR, REGULAR) + for i in range(nslow): + self.addPar('%s:s%d' % (name, i), False, SLOW, SLOW) + self.counts = counts + + def addPar(self, name, readonly, poll, expected_polltype): + # self.count[polltype] += 1 + expected_interval = self.pollinterval + if expected_polltype == SLOW: + expected_interval *= self.slow_pollfactor + elif expected_polltype == DYNAMIC and self.is_busy: + expected_interval *= self.fast_pollfactor + pobj = Parameter(name, readonly, poll, expected_polltype, expected_interval) + setattr(self, 'read_' + pobj.export, pobj.rfunc) + self.parameters[pobj.export] = pobj + return pobj + + def isBusy(self): + return self.is_busy + + def __repr__(self): + rdict = self.__dict__.copy() + rdict.pop('parameters') + return 'Module(%r, counts=%r, f=%r, pollinterval=%g, is_busy=%r)' % (self.name, + self.counts, (self.fast_pollfactor, self.slow_pollfactor, 1), + self.pollinterval, self.is_busy) + +module_list = [ + [Module('x', 3.0, 0.125, 10, False, auto=True), + Module('y', 3.0, 0.125, 10, False, auto=False)], + [Module('a', 1.0, 0.25, 4, True, (5, 5, 10)), + Module('b', 2.0, 0.25, 4, True, (5, 5, 50))], + [Module('c', 1.0, 0.25, 4, False, (5, 0, 0))], + [Module('d', 1.0, 0.25, 4, True, (0, 9, 0))], + [Module('e', 1.0, 0.25, 4, True, (0, 0, 9))], + [Module('f', 1.0, 0.25, 4, True, (0, 0, 0))], + ] +@pytest.mark.parametrize('modules', module_list) +def test_Poller(modules): + # check for proper timing + + for overloaded in False, True: + artime.reset() + count = {DYNAMIC: 0, REGULAR: 0, SLOW: 0} + maxspan = {DYNAMIC: 0, REGULAR: 0, SLOW: 0} + pollTable = dict() + for module in modules: + Poller.add_to_table(pollTable, module) + for pobj in module.parameters.values(): + if pobj.poll: + maxspan[pobj.polltype] = max(maxspan[pobj.polltype], pobj.interval) + count[pobj.polltype] += 1 + pobj.reset() + assert len(pollTable) == 1 + poller = pollTable[(Poller, 'common_iodev')] + artime.stop = poller.stop + poller._stopped = Event() # patch Event.wait + + assert (sum(count.values()) > 0) == bool(poller) + + def started_callback(modules=modules): + for module in modules: + for pobj in module.parameters.values(): + assert pobj.cnt == bool(pobj.poll) # all parameters have to be polled once + pobj.reset() # set maxspan and cnt to 0 + + if overloaded: + # overloaded scenario + artime.commtime = 1.0 + ncycles = 10 + if count[SLOW] > 0: + cycletime = (count[REGULAR] + 1) * count[SLOW] * 2 + else: + cycletime = max(count[REGULAR], count[DYNAMIC]) * 2 + artime.reset(cycletime * ncycles * 1.01) # poller will quit given time + poller.run(started_callback) + total = artime.time() - artime.STARTTIME + for module in modules: + for pobj in module.parameters.values(): + if pobj.poll: + # average_span = total / (pobj.cnt + 1) + assert total / (pobj.cnt + 1) <= max(cycletime, pobj.interval * 1.1) + else: + # normal scenario + artime.commtime = 0.001 + artime.reset(max(maxspan.values()) * 5) # poller will quit given time + poller.run(started_callback) + total = artime.time() - artime.STARTTIME + for module in modules: + for pobj in module.parameters.values(): + if pobj.poll: + assert pobj.maxspan <= maxspan[pobj.polltype] * 1.1 + assert (pobj.cnt + 1) * pobj.interval >= total * 0.99 + assert abs(pobj.span - pobj.interval) < 0.01 + pobj.reset()