From f6f2dd189b6ce96e005082d1639d981e1db90353 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Mon, 28 Mar 2022 18:14:06 +0200 Subject: [PATCH] use a common poller thread for modules sharing io When several poller threads are using the same io, the resposivity of client requests is reduced, as every thread first finishes its pending communication requests, before it is the turn of the request thread. This is solved by using one common poller thread for all modules sharing the same communicator. + fix an issue with overriding a property with a parameter, as this is the case for pollperiod (cfg was applied to property instead of overriding parameter) + separate setFastPoll arguments into flag and fast interval + fix missing announceUpdate call when read function fails + fix mechanism for triggering polls after an io connection reconnected again. Change-Id: I1115a61fae3de80d18416e61f40b52a0eebb637c Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/28021 Tested-by: Jenkins Automated Tests Reviewed-by: Markus Zolliker --- secop/io.py | 6 +- secop/modules.py | 175 +++++++++++++++++++++++++++---------------- secop/properties.py | 7 +- test/test_modules.py | 10 ++- test/test_poller.py | 68 +++++++---------- 5 files changed, 153 insertions(+), 113 deletions(-) diff --git a/secop/io.py b/secop/io.py index 695643f..d7e09df 100644 --- a/secop/io.py +++ b/secop/io.py @@ -118,6 +118,7 @@ class IOBase(Communicator): def earlyInit(self): super().earlyInit() + self._reconnectCallbacks = {} self._lock = threading.RLock() def connectStart(self): @@ -171,10 +172,7 @@ class IOBase(Communicator): if the callback fails or returns False, it is cleared """ - if self._reconnectCallbacks is None: - self._reconnectCallbacks = {name: func} - else: - self._reconnectCallbacks[name] = func + self._reconnectCallbacks[name] = func def callCallbacks(self): for key, cb in list(self._reconnectCallbacks.items()): diff --git a/secop/modules.py b/secop/modules.py index f7b4436..101f50e 100644 --- a/secop/modules.py +++ b/secop/modules.py @@ -24,7 +24,6 @@ import time -from queue import Queue, Empty import threading from collections import OrderedDict from functools import wraps @@ -144,6 +143,7 @@ class HasAccessibles(HasProperties): self.log.debug("read_%s returned %r", pname, value) except Exception as e: self.log.debug("read_%s failed with %r", pname, e) + self.announceUpdate(pname, None, e) raise if value is Done: return getattr(self, pname) @@ -228,6 +228,26 @@ class HasAccessibles(HasProperties): cls.configurables = res +class PollInfo: + def __init__(self, pollinterval, trigger_event): + self.interval = pollinterval + self.last_main = 0 + self.last_slow = 0 + self.last_error = None + self.polled_parameters = [] + self.fast_flag = False + self.trigger_event = trigger_event + + def trigger(self): + """trigger a recalculation of poll due times""" + self.trigger_event.set() + + def update_interval(self, pollinterval): + if not self.fast_flag: + self.interval = pollinterval + self.trigger() + + class Module(HasAccessibles): """basic module @@ -283,6 +303,8 @@ class Module(HasAccessibles): # reference to the dispatcher (used for sending async updates) DISPATCHER = None attachedModules = None + pollInfo = None + triggerPoll = None # trigger event for polls. used on io modules and modules without io def __init__(self, name, logger, cfgdict, srv): # remember the dispatcher object (for the async callbacks) @@ -296,8 +318,8 @@ class Module(HasAccessibles): self.initModuleDone = False self.startModuleDone = False self.remoteLogHandler = None - self.changePollinterval = Queue() # used for waiting between polls and transmit info to the thread self.accessLock = threading.RLock() + self.polledModules = [] # modules polled by thread started in self.startModules errors = [] # handle module properties @@ -568,6 +590,13 @@ class Module(HasAccessibles): def initModule(self): """initialise module with stuff to be done after all modules are created""" self.initModuleDone = True + if self.enablePoll or self.writeDict: + # enablePoll == False: we still need the poll thread for writing values from writeDict + if hasattr(self, 'io'): + self.io.polledModules.append(self) + else: + self.triggerPoll = threading.Event() + self.polledModules = [self] def startModule(self, start_events): """runs after init of all modules @@ -578,9 +607,8 @@ class Module(HasAccessibles): registers it in the server for waiting defaults to 30 seconds """ - if self.enablePoll or self.writeDict: - # enablePoll == False: start poll thread for writing values from writeDict only - mkthread(self.__pollThread, start_events.get_trigger()) + if self.polledModules: + mkthread(self.__pollThread, self.polledModules, start_events.get_trigger()) self.startModuleDone = True def doPoll(self): @@ -589,17 +617,16 @@ class Module(HasAccessibles): all other parameters are polled automatically """ - def setFastPoll(self, pollinterval): + def setFastPoll(self, flag, fast_interval=0.25): """change poll interval - :param pollinterval: a new (typically lower) pollinterval - special values: True: set to 0.25 (default fast poll interval) - False: set to self.pollinterval (value for idle) + :param flag: enable/disable fast poll mode + :param fast_interval: fast poll interval """ - if pollinterval is False: - self.changePollinterval.put(self.pollinterval) - return - self.changePollinterval.put(0.25 if pollinterval is True else pollinterval) + if self.pollInfo: + self.pollInfo.fast_flag = flag + self.pollInfo.interval = fast_interval if flag else self.pollinterval + self.pollInfo.trigger() def callPollFunc(self, rfunc): """call read method with proper error handling""" @@ -612,64 +639,92 @@ class Module(HasAccessibles): except Exception: self.log.error(formatException()) - def __pollThread(self, started_callback): - self.writeInitParams() - if not self.enablePoll: + def __pollThread(self, modules, started_callback): + """poll thread body + + :param modules: list of modules to be handled by this thread + :param started_callback: to be called after all polls are done once + + before polling, parameters which need hardware initialisation are written + """ + for mobj in modules: + mobj.writeInitParams() + modules = [m for m in modules if m.enablePoll] + if not modules: # no polls needed - exit thread + started_callback() return - polled_parameters = [] + if hasattr(self, 'registerReconnectCallback'): + # self is a communicator supporting reconnections + def trigger_all(trg=self.triggerPoll, polled_modules=modules): + for m in polled_modules: + m.pollInfo.last_main = 0 + m.pollInfo.last_slow = 0 + trg.set() + self.registerReconnectCallback('trigger_polls', trigger_all) + # collect and call all read functions a first time - for pname, pobj in self.parameters.items(): - rfunc = getattr(self, 'read_' + pname) - if rfunc.poll: - polled_parameters.append((rfunc, pobj)) - self.callPollFunc(rfunc) + for mobj in modules: + pinfo = mobj.pollInfo = PollInfo(mobj.pollinterval, self.triggerPoll) + # trigger a poll interval change when self.pollinterval changes. + if 'pollinterval' in mobj.valueCallbacks: + mobj.valueCallbacks['pollinterval'].append(pinfo.update_interval) + + for pname, pobj in mobj.parameters.items(): + rfunc = getattr(mobj, 'read_' + pname) + if rfunc.poll: + pinfo.polled_parameters.append((mobj, rfunc, pobj)) + mobj.callPollFunc(rfunc) started_callback() - pollinterval = self.pollinterval - last_slow = last_main = 0 - last_error = None - error_count = 0 to_poll = () while True: now = time.time() - wait_main = last_main + pollinterval - now - wait_slow = last_slow + self.slowinterval - now - wait_time = min(wait_main, wait_slow) + wait_time = 999 + for mobj in modules: + pinfo = mobj.pollInfo + wait_time = min(pinfo.last_main + pinfo.interval - now, wait_time, + pinfo.last_slow + mobj.slowinterval - now) if wait_time > 0: - try: - result = self.changePollinterval.get(timeout=wait_time) - except Empty: - result = None - if result is not None: - pollinterval = result + self.triggerPoll.wait(wait_time) + self.triggerPoll.clear() continue - # call doPoll, if due - if wait_main <= 0: - last_main = (now // pollinterval) * pollinterval - try: - self.doPoll() - if last_error and error_count > 1: - self.log.info('recovered after %d calls to doPoll (%r)', error_count, last_error) - last_error = None - except Exception as e: - if repr(e) != last_error: - error_count = 0 - self.log.error('error in doPoll: %r', e) - error_count += 1 - last_error = repr(e) + # call doPoll of all modules where due + for mobj in modules: + pinfo = mobj.pollInfo + if now > pinfo.last_main + pinfo.interval: + pinfo.last_main = (now // pinfo.interval) * pinfo.interval + try: + mobj.doPoll() + pinfo.last_error = None + except Exception as e: + if str(e) != str(pinfo.last_error) and not isinstance(e, SilentError): + mobj.log.error('doPoll: %r', e) + pinfo.last_error = e now = time.time() # find ONE due slow poll and call it loop = True while loop: # loops max. 2 times, when to_poll is at end - for rfunc, pobj in to_poll: - if now > pobj.timestamp + self.slowinterval * 0.5: - self.callPollFunc(rfunc) - loop = False + for mobj, rfunc, pobj in to_poll: + if now > pobj.timestamp + mobj.slowinterval * 0.5: + try: + prev_err = pobj.readerror + rfunc() + except Exception as e: + if not isinstance(e, SilentError) and str(pobj.readerror) != str(prev_err): + mobj.log.error('%s: %r', pobj.name, e) + loop = False # one poll done break else: - if now < last_slow + self.slowinterval: - break - last_slow = (now // self.slowinterval) * self.slowinterval - to_poll = iter(polled_parameters) + to_poll = [] + # collect due slow polls + for mobj in modules: + pinfo = mobj.pollInfo + if now > pinfo.last_slow + mobj.slowinterval: + to_poll.extend(pinfo.polled_parameters) + pinfo.last_slow = (now // mobj.slowinterval) * mobj.slowinterval + if to_poll: + to_poll = iter(to_poll) + else: + loop = False # no slow polls ready def writeInitParams(self, started_callback=None): """write values for parameters with configured values @@ -722,12 +777,6 @@ class Readable(Module): pollinterval = Parameter('default poll interval', FloatRange(0.1, 120), default=5, readonly=False, export=True) - def earlyInit(self): - super().earlyInit() - # trigger a poll interval change when self.pollinterval changes. - # self.setFastPoll with a float argument does the job here - self.valueCallbacks['pollinterval'].append(self.setFastPoll) - def doPoll(self): self.read_value() self.read_status() diff --git a/secop/properties.py b/secop/properties.py index f12d5a9..d491f65 100644 --- a/secop/properties.py +++ b/secop/properties.py @@ -136,10 +136,11 @@ class HasProperties(HasDescriptors): properties.update({k: v for k, v in base.__dict__.items() if isinstance(v, Property)}) cls.propertyDict = properties # treat overriding properties with bare values - for pn, po in properties.items(): + for pn, po in list(properties.items()): value = getattr(cls, pn, po) - if not isinstance(value, (Property, HasProperties)): # attribute may be a bare value - # HasProperties is a base class of Parameter -> allow a Parameter to override a Property () + if isinstance(value, HasProperties): # value is a Parameter, allow override + properties.pop(pn) + elif not isinstance(value, Property): # attribute may be a bare value po = po.copy() try: # try to apply bare value to Property diff --git a/test/test_modules.py b/test/test_modules.py index 7760404..c2230e1 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -69,6 +69,7 @@ class ServerStub: class DummyMultiEvent(threading.Event): def get_trigger(self): + def trigger(event=self): event.set() sys.exit() @@ -80,8 +81,9 @@ def test_Communicator(): o.earlyInit() o.initModule() event = DummyMultiEvent() + o.initModule() o.startModule(event) - assert event.is_set() # event should be set immediately + assert event.wait(timeout=0.1) def test_ModuleMagic(): @@ -193,8 +195,9 @@ def test_ModuleMagic(): assert updates.pop('o1') == expectedBeforeStart o1.earlyInit() event = DummyMultiEvent() + o1.initModule() o1.startModule(event) - event.wait() + assert event.wait(timeout=0.1) # should contain polled values expectedAfterStart = { 'status': (Drivable.Status.IDLE, 'ok'), 'value': 'second', @@ -209,8 +212,9 @@ def test_ModuleMagic(): assert updates.pop('o2') == expectedBeforeStart o2.earlyInit() event = DummyMultiEvent() + o2.initModule() o2.startModule(event) - event.wait() + assert event.wait(timeout=0.1) # value has changed type, b2 and a1 are written expectedAfterStart.update(value=0, b2='EMPTY', a1=True) # ramerk: a1=True: this behaviour is a Porgamming error diff --git a/test/test_poller.py b/test/test_poller.py index ae94e29..bea914c 100644 --- a/test/test_poller.py +++ b/test/test_poller.py @@ -23,6 +23,7 @@ import sys import threading +from time import time as current_time import time import logging @@ -33,43 +34,19 @@ from secop.lib.multievent import MultiEvent class Time: - STARTTIME = 1000 # artificial time zero - + """artificial time, forwarded on sleep instead of waiting""" def __init__(self): - self.reset() - self.finish = float('inf') - self.stop = lambda : None - self.commtime = 0.05 # time needed for 1 poll - - def reset(self, lifetime=10): - self.seconds = self.STARTTIME - self.idletime = 0.0 - self.busytime = 0.0 - self.finish = self.STARTTIME + lifetime + self.offset = 0 def time(self): - if self.seconds > self.finish: - self.finish = float('inf') - self.stop() - return self.seconds + return current_time() + self.offset def sleep(self, seconds): assert 0 <= seconds <= 24*3600 - self.idletime += seconds - self.seconds += seconds - - def busy(self, seconds): - assert seconds >= 0 - self.seconds += seconds - self.busytime += seconds + self.offset += seconds -artime = Time() # artificial test time - - -class Event(threading.Event): - def wait(self, timeout=None): - artime.sleep(max(0, timeout)) +artime = Time() # artificial test time class DispatcherStub: @@ -97,13 +74,25 @@ class Base(Module): srv = ServerStub() super().__init__('mod', logging.getLogger('dummy'), dict(description=''), srv) self.dispatcher = srv.dispatcher - self.nextPollEvent = Event() def run(self, maxcycles): self.dispatcher.maxcycles = maxcycles self.dispatcher.finish_event = threading.Event() + self.initModule() + + def wait(timeout=None, base=self.triggerPoll): + """simplified simulation + + when an event is already set return True, else forward artificial time + """ + if base.is_set(): + return True + artime.sleep(max(0.0, 99.9 if timeout is None else timeout)) + return base.is_set() + + self.triggerPoll.wait = wait self.startModule(MultiEvent()) - self.dispatcher.finish_event.wait(1) + assert self.dispatcher.finish_event.wait(1) class Mod1(Base, Readable): @@ -132,14 +121,13 @@ class Mod1(Base, Readable): @pytest.mark.parametrize( 'ncycles, pollinterval, slowinterval, mspan, pspan', - [ # normal case: 5+-1 15+-1 - ( 60, 5, 15, (4, 6), (14, 16)), - # pollinterval faster then reading: mspan max 3 s (polls of value, status and ONE other parameter) - ( 60, 1, 5, (1, 3), (5, 16)), + [ # normal case: + ( 60, 5, 15, (4.9, 5.1), (14, 16)), + # pollinterval faster then reading: mspan max ~ 3 s (polls of value, status and ONE other parameter) + ( 60, 1, 5, (0.9, 3.1), (5, 17)), ]) def test_poll(ncycles, pollinterval, slowinterval, mspan, pspan, monkeypatch): monkeypatch.setattr(time, 'time', artime.time) - artime.reset() m = Mod1() m.pollinterval = pollinterval m.slowInterval = slowinterval @@ -148,18 +136,18 @@ def test_poll(ncycles, pollinterval, slowinterval, mspan, pspan, monkeypatch): for pname in ['value', 'status']: pobj = m.parameters[pname] lowcnt = 0 + print(pname, [t2 - t1 for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1])]) for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1]): if t2 - t1 < mspan[0]: - print(t2 - t1) lowcnt += 1 assert t2 - t1 <= mspan[1] - assert lowcnt <= 1 + assert lowcnt <= 2 for pname in ['param1', 'param2', 'param3']: pobj = m.parameters[pname] lowcnt = 0 + print(pname, [t2 - t1 for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1])]) for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1]): if t2 - t1 < pspan[0]: - print(pname, t2 - t1) lowcnt += 1 assert t2 - t1 <= pspan[1] - assert lowcnt <= 1 + assert lowcnt <= 2