use a common poller thread for modules sharing io

When several poller threads are using the same io, the resposivity
of client requests is reduced, as every thread first finishes
its pending communication requests, before it is the turn of the
request thread. This is solved by using one common poller thread
for all modules sharing the same communicator.

+ fix an issue with overriding a property with a parameter, as
  this is the case for pollperiod (cfg was applied to property
  instead of overriding parameter)
+ separate setFastPoll arguments into flag and fast interval
+ fix missing announceUpdate call when read function fails
+ fix mechanism for triggering polls after an io connection
  reconnected again.

Change-Id: I1115a61fae3de80d18416e61f40b52a0eebb637c
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/28021
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2022-03-28 18:14:06 +02:00 committed by l_samenv
parent d5924567da
commit f6f2dd189b
5 changed files with 153 additions and 113 deletions

View File

@ -118,6 +118,7 @@ class IOBase(Communicator):
def earlyInit(self):
super().earlyInit()
self._reconnectCallbacks = {}
self._lock = threading.RLock()
def connectStart(self):
@ -171,9 +172,6 @@ class IOBase(Communicator):
if the callback fails or returns False, it is cleared
"""
if self._reconnectCallbacks is None:
self._reconnectCallbacks = {name: func}
else:
self._reconnectCallbacks[name] = func
def callCallbacks(self):

View File

@ -24,7 +24,6 @@
import time
from queue import Queue, Empty
import threading
from collections import OrderedDict
from functools import wraps
@ -144,6 +143,7 @@ class HasAccessibles(HasProperties):
self.log.debug("read_%s returned %r", pname, value)
except Exception as e:
self.log.debug("read_%s failed with %r", pname, e)
self.announceUpdate(pname, None, e)
raise
if value is Done:
return getattr(self, pname)
@ -228,6 +228,26 @@ class HasAccessibles(HasProperties):
cls.configurables = res
class PollInfo:
def __init__(self, pollinterval, trigger_event):
self.interval = pollinterval
self.last_main = 0
self.last_slow = 0
self.last_error = None
self.polled_parameters = []
self.fast_flag = False
self.trigger_event = trigger_event
def trigger(self):
"""trigger a recalculation of poll due times"""
self.trigger_event.set()
def update_interval(self, pollinterval):
if not self.fast_flag:
self.interval = pollinterval
self.trigger()
class Module(HasAccessibles):
"""basic module
@ -283,6 +303,8 @@ class Module(HasAccessibles):
# reference to the dispatcher (used for sending async updates)
DISPATCHER = None
attachedModules = None
pollInfo = None
triggerPoll = None # trigger event for polls. used on io modules and modules without io
def __init__(self, name, logger, cfgdict, srv):
# remember the dispatcher object (for the async callbacks)
@ -296,8 +318,8 @@ class Module(HasAccessibles):
self.initModuleDone = False
self.startModuleDone = False
self.remoteLogHandler = None
self.changePollinterval = Queue() # used for waiting between polls and transmit info to the thread
self.accessLock = threading.RLock()
self.polledModules = [] # modules polled by thread started in self.startModules
errors = []
# handle module properties
@ -568,6 +590,13 @@ class Module(HasAccessibles):
def initModule(self):
"""initialise module with stuff to be done after all modules are created"""
self.initModuleDone = True
if self.enablePoll or self.writeDict:
# enablePoll == False: we still need the poll thread for writing values from writeDict
if hasattr(self, 'io'):
self.io.polledModules.append(self)
else:
self.triggerPoll = threading.Event()
self.polledModules = [self]
def startModule(self, start_events):
"""runs after init of all modules
@ -578,9 +607,8 @@ class Module(HasAccessibles):
registers it in the server for waiting
<timeout> defaults to 30 seconds
"""
if self.enablePoll or self.writeDict:
# enablePoll == False: start poll thread for writing values from writeDict only
mkthread(self.__pollThread, start_events.get_trigger())
if self.polledModules:
mkthread(self.__pollThread, self.polledModules, start_events.get_trigger())
self.startModuleDone = True
def doPoll(self):
@ -589,17 +617,16 @@ class Module(HasAccessibles):
all other parameters are polled automatically
"""
def setFastPoll(self, pollinterval):
def setFastPoll(self, flag, fast_interval=0.25):
"""change poll interval
:param pollinterval: a new (typically lower) pollinterval
special values: True: set to 0.25 (default fast poll interval)
False: set to self.pollinterval (value for idle)
:param flag: enable/disable fast poll mode
:param fast_interval: fast poll interval
"""
if pollinterval is False:
self.changePollinterval.put(self.pollinterval)
return
self.changePollinterval.put(0.25 if pollinterval is True else pollinterval)
if self.pollInfo:
self.pollInfo.fast_flag = flag
self.pollInfo.interval = fast_interval if flag else self.pollinterval
self.pollInfo.trigger()
def callPollFunc(self, rfunc):
"""call read method with proper error handling"""
@ -612,64 +639,92 @@ class Module(HasAccessibles):
except Exception:
self.log.error(formatException())
def __pollThread(self, started_callback):
self.writeInitParams()
if not self.enablePoll:
return
polled_parameters = []
# collect and call all read functions a first time
for pname, pobj in self.parameters.items():
rfunc = getattr(self, 'read_' + pname)
if rfunc.poll:
polled_parameters.append((rfunc, pobj))
self.callPollFunc(rfunc)
def __pollThread(self, modules, started_callback):
"""poll thread body
:param modules: list of modules to be handled by this thread
:param started_callback: to be called after all polls are done once
before polling, parameters which need hardware initialisation are written
"""
for mobj in modules:
mobj.writeInitParams()
modules = [m for m in modules if m.enablePoll]
if not modules: # no polls needed - exit thread
started_callback()
return
if hasattr(self, 'registerReconnectCallback'):
# self is a communicator supporting reconnections
def trigger_all(trg=self.triggerPoll, polled_modules=modules):
for m in polled_modules:
m.pollInfo.last_main = 0
m.pollInfo.last_slow = 0
trg.set()
self.registerReconnectCallback('trigger_polls', trigger_all)
# collect and call all read functions a first time
for mobj in modules:
pinfo = mobj.pollInfo = PollInfo(mobj.pollinterval, self.triggerPoll)
# trigger a poll interval change when self.pollinterval changes.
if 'pollinterval' in mobj.valueCallbacks:
mobj.valueCallbacks['pollinterval'].append(pinfo.update_interval)
for pname, pobj in mobj.parameters.items():
rfunc = getattr(mobj, 'read_' + pname)
if rfunc.poll:
pinfo.polled_parameters.append((mobj, rfunc, pobj))
mobj.callPollFunc(rfunc)
started_callback()
pollinterval = self.pollinterval
last_slow = last_main = 0
last_error = None
error_count = 0
to_poll = ()
while True:
now = time.time()
wait_main = last_main + pollinterval - now
wait_slow = last_slow + self.slowinterval - now
wait_time = min(wait_main, wait_slow)
wait_time = 999
for mobj in modules:
pinfo = mobj.pollInfo
wait_time = min(pinfo.last_main + pinfo.interval - now, wait_time,
pinfo.last_slow + mobj.slowinterval - now)
if wait_time > 0:
try:
result = self.changePollinterval.get(timeout=wait_time)
except Empty:
result = None
if result is not None:
pollinterval = result
self.triggerPoll.wait(wait_time)
self.triggerPoll.clear()
continue
# call doPoll, if due
if wait_main <= 0:
last_main = (now // pollinterval) * pollinterval
# call doPoll of all modules where due
for mobj in modules:
pinfo = mobj.pollInfo
if now > pinfo.last_main + pinfo.interval:
pinfo.last_main = (now // pinfo.interval) * pinfo.interval
try:
self.doPoll()
if last_error and error_count > 1:
self.log.info('recovered after %d calls to doPoll (%r)', error_count, last_error)
last_error = None
mobj.doPoll()
pinfo.last_error = None
except Exception as e:
if repr(e) != last_error:
error_count = 0
self.log.error('error in doPoll: %r', e)
error_count += 1
last_error = repr(e)
if str(e) != str(pinfo.last_error) and not isinstance(e, SilentError):
mobj.log.error('doPoll: %r', e)
pinfo.last_error = e
now = time.time()
# find ONE due slow poll and call it
loop = True
while loop: # loops max. 2 times, when to_poll is at end
for rfunc, pobj in to_poll:
if now > pobj.timestamp + self.slowinterval * 0.5:
self.callPollFunc(rfunc)
loop = False
for mobj, rfunc, pobj in to_poll:
if now > pobj.timestamp + mobj.slowinterval * 0.5:
try:
prev_err = pobj.readerror
rfunc()
except Exception as e:
if not isinstance(e, SilentError) and str(pobj.readerror) != str(prev_err):
mobj.log.error('%s: %r', pobj.name, e)
loop = False # one poll done
break
else:
if now < last_slow + self.slowinterval:
break
last_slow = (now // self.slowinterval) * self.slowinterval
to_poll = iter(polled_parameters)
to_poll = []
# collect due slow polls
for mobj in modules:
pinfo = mobj.pollInfo
if now > pinfo.last_slow + mobj.slowinterval:
to_poll.extend(pinfo.polled_parameters)
pinfo.last_slow = (now // mobj.slowinterval) * mobj.slowinterval
if to_poll:
to_poll = iter(to_poll)
else:
loop = False # no slow polls ready
def writeInitParams(self, started_callback=None):
"""write values for parameters with configured values
@ -722,12 +777,6 @@ class Readable(Module):
pollinterval = Parameter('default poll interval', FloatRange(0.1, 120),
default=5, readonly=False, export=True)
def earlyInit(self):
super().earlyInit()
# trigger a poll interval change when self.pollinterval changes.
# self.setFastPoll with a float argument does the job here
self.valueCallbacks['pollinterval'].append(self.setFastPoll)
def doPoll(self):
self.read_value()
self.read_status()

View File

@ -136,10 +136,11 @@ class HasProperties(HasDescriptors):
properties.update({k: v for k, v in base.__dict__.items() if isinstance(v, Property)})
cls.propertyDict = properties
# treat overriding properties with bare values
for pn, po in properties.items():
for pn, po in list(properties.items()):
value = getattr(cls, pn, po)
if not isinstance(value, (Property, HasProperties)): # attribute may be a bare value
# HasProperties is a base class of Parameter -> allow a Parameter to override a Property ()
if isinstance(value, HasProperties): # value is a Parameter, allow override
properties.pop(pn)
elif not isinstance(value, Property): # attribute may be a bare value
po = po.copy()
try:
# try to apply bare value to Property

View File

@ -69,6 +69,7 @@ class ServerStub:
class DummyMultiEvent(threading.Event):
def get_trigger(self):
def trigger(event=self):
event.set()
sys.exit()
@ -80,8 +81,9 @@ def test_Communicator():
o.earlyInit()
o.initModule()
event = DummyMultiEvent()
o.initModule()
o.startModule(event)
assert event.is_set() # event should be set immediately
assert event.wait(timeout=0.1)
def test_ModuleMagic():
@ -193,8 +195,9 @@ def test_ModuleMagic():
assert updates.pop('o1') == expectedBeforeStart
o1.earlyInit()
event = DummyMultiEvent()
o1.initModule()
o1.startModule(event)
event.wait()
assert event.wait(timeout=0.1)
# should contain polled values
expectedAfterStart = {
'status': (Drivable.Status.IDLE, 'ok'), 'value': 'second',
@ -209,8 +212,9 @@ def test_ModuleMagic():
assert updates.pop('o2') == expectedBeforeStart
o2.earlyInit()
event = DummyMultiEvent()
o2.initModule()
o2.startModule(event)
event.wait()
assert event.wait(timeout=0.1)
# value has changed type, b2 and a1 are written
expectedAfterStart.update(value=0, b2='EMPTY', a1=True)
# ramerk: a1=True: this behaviour is a Porgamming error

View File

@ -23,6 +23,7 @@
import sys
import threading
from time import time as current_time
import time
import logging
@ -33,45 +34,21 @@ from secop.lib.multievent import MultiEvent
class Time:
STARTTIME = 1000 # artificial time zero
"""artificial time, forwarded on sleep instead of waiting"""
def __init__(self):
self.reset()
self.finish = float('inf')
self.stop = lambda : None
self.commtime = 0.05 # time needed for 1 poll
def reset(self, lifetime=10):
self.seconds = self.STARTTIME
self.idletime = 0.0
self.busytime = 0.0
self.finish = self.STARTTIME + lifetime
self.offset = 0
def time(self):
if self.seconds > self.finish:
self.finish = float('inf')
self.stop()
return self.seconds
return current_time() + self.offset
def sleep(self, seconds):
assert 0 <= seconds <= 24*3600
self.idletime += seconds
self.seconds += seconds
def busy(self, seconds):
assert seconds >= 0
self.seconds += seconds
self.busytime += seconds
self.offset += seconds
artime = Time() # artificial test time
class Event(threading.Event):
def wait(self, timeout=None):
artime.sleep(max(0, timeout))
class DispatcherStub:
maxcycles = 10
@ -97,13 +74,25 @@ class Base(Module):
srv = ServerStub()
super().__init__('mod', logging.getLogger('dummy'), dict(description=''), srv)
self.dispatcher = srv.dispatcher
self.nextPollEvent = Event()
def run(self, maxcycles):
self.dispatcher.maxcycles = maxcycles
self.dispatcher.finish_event = threading.Event()
self.initModule()
def wait(timeout=None, base=self.triggerPoll):
"""simplified simulation
when an event is already set return True, else forward artificial time
"""
if base.is_set():
return True
artime.sleep(max(0.0, 99.9 if timeout is None else timeout))
return base.is_set()
self.triggerPoll.wait = wait
self.startModule(MultiEvent())
self.dispatcher.finish_event.wait(1)
assert self.dispatcher.finish_event.wait(1)
class Mod1(Base, Readable):
@ -132,14 +121,13 @@ class Mod1(Base, Readable):
@pytest.mark.parametrize(
'ncycles, pollinterval, slowinterval, mspan, pspan',
[ # normal case: 5+-1 15+-1
( 60, 5, 15, (4, 6), (14, 16)),
# pollinterval faster then reading: mspan max 3 s (polls of value, status and ONE other parameter)
( 60, 1, 5, (1, 3), (5, 16)),
[ # normal case:
( 60, 5, 15, (4.9, 5.1), (14, 16)),
# pollinterval faster then reading: mspan max ~ 3 s (polls of value, status and ONE other parameter)
( 60, 1, 5, (0.9, 3.1), (5, 17)),
])
def test_poll(ncycles, pollinterval, slowinterval, mspan, pspan, monkeypatch):
monkeypatch.setattr(time, 'time', artime.time)
artime.reset()
m = Mod1()
m.pollinterval = pollinterval
m.slowInterval = slowinterval
@ -148,18 +136,18 @@ def test_poll(ncycles, pollinterval, slowinterval, mspan, pspan, monkeypatch):
for pname in ['value', 'status']:
pobj = m.parameters[pname]
lowcnt = 0
print(pname, [t2 - t1 for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1])])
for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1]):
if t2 - t1 < mspan[0]:
print(t2 - t1)
lowcnt += 1
assert t2 - t1 <= mspan[1]
assert lowcnt <= 1
assert lowcnt <= 2
for pname in ['param1', 'param2', 'param3']:
pobj = m.parameters[pname]
lowcnt = 0
print(pname, [t2 - t1 for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1])])
for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1]):
if t2 - t1 < pspan[0]:
print(pname, t2 - t1)
lowcnt += 1
assert t2 - t1 <= pspan[1]
assert lowcnt <= 1
assert lowcnt <= 2