changed started_callback mechanism

- using threading.Events instead of Queue
- started_callback has no more argument
- introduced timeout for starting modules

Change-Id: I5a8b59cf552918cf7e61ae93cda907f7b0d97836
Reviewed-on: https://forge.frm2.tum.de/review/20281
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
This commit is contained in:
zolliker 2019-03-29 10:33:48 +01:00
parent 752f8f8093
commit 0bb4c3730e
3 changed files with 35 additions and 35 deletions

View File

@ -223,11 +223,12 @@ class Module(object):
'''runs after init of all modules '''runs after init of all modules
started_callback to be called when thread spawned by late_init started_callback to be called when thread spawned by late_init
or, if not implmemented, immediately or, if not implemented, immediately
might return a timeout value, if different from default
''' '''
self.log.debug('empty %s.start_module()' % self.__class__.__name__) self.log.debug('empty %s.start_module()' % self.__class__.__name__)
started_callback(self) started_callback()
class Readable(Module): class Readable(Module):
@ -273,7 +274,7 @@ class Readable(Module):
except Exception as e: except Exception as e:
self.log.exception(e) self.log.exception(e)
self.status = (self.Status.ERROR, 'polling thread could not start') self.status = (self.Status.ERROR, 'polling thread could not start')
started_callback(self) started_callback()
print(formatException(0, sys.exc_info(), verbose=True)) print(formatException(0, sys.exc_info(), verbose=True))
time.sleep(10) time.sleep(10)
@ -281,7 +282,7 @@ class Readable(Module):
"""super simple and super stupid per-module polling thread""" """super simple and super stupid per-module polling thread"""
i = 0 i = 0
fastpoll = self.poll(i) fastpoll = self.poll(i)
started_callback(self) started_callback()
while True: while True:
i += 1 i += 1
try: try:

View File

@ -39,12 +39,6 @@ try:
except ImportError: except ImportError:
import ConfigParser as configparser # py2 import ConfigParser as configparser # py2
try:
from queue import Queue # py 3
except ImportError:
from Queue import Queue # py 2
try: try:
import daemon.pidlockfile as pidlockfile import daemon.pidlockfile as pidlockfile
except ImportError: except ImportError:
@ -198,18 +192,14 @@ class Server(object):
for modname, modobj in self.modules.items(): for modname, modobj in self.modules.items():
modobj.init_module() modobj.init_module()
starting_modules = set() start_events = []
finished_modules = Queue()
for modname, modobj in self.modules.items(): for modname, modobj in self.modules.items():
starting_modules.add(modobj) event = threading.Event()
modobj.start_module(started_callback=finished_modules.put) # start_module must return either a timeout value or None (default 30 sec)
# remark: it is the module implementors responsibility to call started_callback timeout = modobj.start_module(started_callback=event.set) or 30
# within reasonable time (using timeouts). If we find later, that this is not start_events.append((time.time() + timeout, modname, event))
# enough, we might insert checking for a timeout here, and somehow set the remaining self.log.info(u'waiting for modules being started')
# starting_modules to an error state. for deadline, modname, event in sorted(start_events):
while starting_modules: if not event.wait(timeout=max(0, deadline - time.time())):
finished = finished_modules.get() self.log.info('WARNING: timeout when starting module %s' % modname)
self.log.info(u'%s has started' % finished.name) self.log.info(u'all modules started')
# use discard instead of remove here, catching the case when started_callback is called twice
starting_modules.discard(finished)
finished_modules.task_done()

View File

@ -25,17 +25,12 @@ from __future__ import division, print_function
# no fixtures needed # no fixtures needed
#import pytest #import pytest
import threading
from secop.datatypes import BoolType, EnumType, FloatRange from secop.datatypes import BoolType, EnumType, FloatRange
from secop.metaclass import ModuleMeta from secop.metaclass import ModuleMeta
from secop.modules import Communicator, Drivable, Module from secop.modules import Communicator, Drivable, Module
from secop.params import Command, Override, Parameter from secop.params import Command, Override, Parameter
try:
import Queue as queue
except ImportError:
import queue as queue
def test_Communicator(): def test_Communicator():
@ -55,9 +50,9 @@ def test_Communicator():
o = Communicator('communicator',logger, {}, srv) o = Communicator('communicator',logger, {}, srv)
o.early_init() o.early_init()
o.init_module() o.init_module()
q = queue.Queue() event = threading.Event()
o.start_module(q.put) o.start_module(event.set)
q.get() assert event.is_set() # event should be set immediately
def test_ModuleMeta(): def test_ModuleMeta():
newclass1 = ModuleMeta.__new__(ModuleMeta, 'TestDrivable', (Drivable,), { newclass1 = ModuleMeta.__new__(ModuleMeta, 'TestDrivable', (Drivable,), {
@ -68,6 +63,8 @@ def test_ModuleMeta():
"cmd": Command('stuff', BoolType(), BoolType()) "cmd": Command('stuff', BoolType(), BoolType())
}, },
"commands": { "commands": {
# intermixing parameters with commands is not recommended,
# but acceptable for influencing the order
'a1': Parameter('a1', datatype=BoolType(), default=False), 'a1': Parameter('a1', datatype=BoolType(), default=False),
'a2': Parameter('a2', datatype=BoolType(), default=True), 'a2': Parameter('a2', datatype=BoolType(), default=True),
'value': Override(datatype=BoolType(), default=True), 'value': Override(datatype=BoolType(), default=True),
@ -99,6 +96,7 @@ def test_ModuleMeta():
logger = type('LoggerStub', (object,), dict( logger = type('LoggerStub', (object,), dict(
debug = lambda self, *a: print(*a), debug = lambda self, *a: print(*a),
info = lambda self, *a: print(*a), info = lambda self, *a: print(*a),
exception = lambda self, *a: print(*a),
))() ))()
dispatcher = type('DispatcherStub', (object,), dict( dispatcher = type('DispatcherStub', (object,), dict(
@ -144,6 +142,17 @@ def test_ModuleMeta():
o.early_init() o.early_init()
for o in objects: for o in objects:
o.init_module() o.init_module()
q = queue.Queue()
for o in objects: for o in objects:
o.start_module(q.put) event = threading.Event()
event2 = threading.Event()
def started_callback(event=event, event2=event2):
if event.is_set():
event2.set()
else:
event.set()
raise Exception("end test") # this will kill the polling thread on the second call
o.start_module(started_callback)
assert event2.wait(timeout=1)
assert event.is_set()