diff --git a/secop/modules.py b/secop/modules.py index fff3a92..07c0936 100644 --- a/secop/modules.py +++ b/secop/modules.py @@ -223,11 +223,12 @@ class Module(object): '''runs after init of all modules started_callback to be called when thread spawned by late_init - or, if not implmemented, immediately + or, if not implemented, immediately + might return a timeout value, if different from default ''' self.log.debug('empty %s.start_module()' % self.__class__.__name__) - started_callback(self) + started_callback() class Readable(Module): @@ -273,7 +274,7 @@ class Readable(Module): except Exception as e: self.log.exception(e) self.status = (self.Status.ERROR, 'polling thread could not start') - started_callback(self) + started_callback() print(formatException(0, sys.exc_info(), verbose=True)) time.sleep(10) @@ -281,7 +282,7 @@ class Readable(Module): """super simple and super stupid per-module polling thread""" i = 0 fastpoll = self.poll(i) - started_callback(self) + started_callback() while True: i += 1 try: diff --git a/secop/server.py b/secop/server.py index 9689084..13d913b 100644 --- a/secop/server.py +++ b/secop/server.py @@ -39,12 +39,6 @@ try: except ImportError: import ConfigParser as configparser # py2 -try: - from queue import Queue # py 3 -except ImportError: - from Queue import Queue # py 2 - - try: import daemon.pidlockfile as pidlockfile except ImportError: @@ -198,18 +192,14 @@ class Server(object): for modname, modobj in self.modules.items(): modobj.init_module() - starting_modules = set() - finished_modules = Queue() + start_events = [] for modname, modobj in self.modules.items(): - starting_modules.add(modobj) - modobj.start_module(started_callback=finished_modules.put) - # remark: it is the module implementors responsibility to call started_callback - # within reasonable time (using timeouts). If we find later, that this is not - # enough, we might insert checking for a timeout here, and somehow set the remaining - # starting_modules to an error state. - while starting_modules: - finished = finished_modules.get() - self.log.info(u'%s has started' % finished.name) - # use discard instead of remove here, catching the case when started_callback is called twice - starting_modules.discard(finished) - finished_modules.task_done() + event = threading.Event() + # start_module must return either a timeout value or None (default 30 sec) + timeout = modobj.start_module(started_callback=event.set) or 30 + start_events.append((time.time() + timeout, modname, event)) + self.log.info(u'waiting for modules being started') + for deadline, modname, event in sorted(start_events): + if not event.wait(timeout=max(0, deadline - time.time())): + self.log.info('WARNING: timeout when starting module %s' % modname) + self.log.info(u'all modules started') diff --git a/test/test_modules.py b/test/test_modules.py index f08f09d..e68d91d 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -25,17 +25,12 @@ from __future__ import division, print_function # no fixtures needed #import pytest +import threading from secop.datatypes import BoolType, EnumType, FloatRange from secop.metaclass import ModuleMeta from secop.modules import Communicator, Drivable, Module from secop.params import Command, Override, Parameter -try: - import Queue as queue -except ImportError: - import queue as queue - - def test_Communicator(): @@ -55,9 +50,9 @@ def test_Communicator(): o = Communicator('communicator',logger, {}, srv) o.early_init() o.init_module() - q = queue.Queue() - o.start_module(q.put) - q.get() + event = threading.Event() + o.start_module(event.set) + assert event.is_set() # event should be set immediately def test_ModuleMeta(): newclass1 = ModuleMeta.__new__(ModuleMeta, 'TestDrivable', (Drivable,), { @@ -68,6 +63,8 @@ def test_ModuleMeta(): "cmd": Command('stuff', BoolType(), BoolType()) }, "commands": { + # intermixing parameters with commands is not recommended, + # but acceptable for influencing the order 'a1': Parameter('a1', datatype=BoolType(), default=False), 'a2': Parameter('a2', datatype=BoolType(), default=True), 'value': Override(datatype=BoolType(), default=True), @@ -99,6 +96,7 @@ def test_ModuleMeta(): logger = type('LoggerStub', (object,), dict( debug = lambda self, *a: print(*a), info = lambda self, *a: print(*a), + exception = lambda self, *a: print(*a), ))() dispatcher = type('DispatcherStub', (object,), dict( @@ -144,6 +142,17 @@ def test_ModuleMeta(): o.early_init() for o in objects: o.init_module() - q = queue.Queue() + for o in objects: - o.start_module(q.put) + event = threading.Event() + event2 = threading.Event() + def started_callback(event=event, event2=event2): + if event.is_set(): + event2.set() + else: + event.set() + raise Exception("end test") # this will kill the polling thread on the second call + + o.start_module(started_callback) + assert event2.wait(timeout=1) + assert event.is_set()