diff --git a/secop/modules.py b/secop/modules.py index e6b3191..a989f7b 100644 --- a/secop/modules.py +++ b/secop/modules.py @@ -177,11 +177,19 @@ class Module(object): def init(self): # may be overriden in derived classes to init stuff self.log.debug('empty init()') - mkthread(self.late_init) - def late_init(self): - # this runs async somewhen after init - self.log.debug('late init()') + def postinit(self): + self.log.debug('empty postinit()') + + def late_init(self, started_callback): + '''runs after postinit of all modules + + started_callback to be called when thread spawned by late_init + or, if not implmemented, immediately + ''' + + self.log.debug('empty late init()') + started_callback(self) class Readable(Module): @@ -217,26 +225,33 @@ class Readable(Module): def init(self): Module.init(self) - self._pollthread = mkthread(self.__pollThread) - def __pollThread(self): + def late_init(self, started_callback): + '''start polling thread''' + mkthread(self.__pollThread, started_callback) + + def __pollThread(self, started_callback): try: - self.__pollThread_inner() + self.__pollThread_inner(started_callback) except Exception as e: self.log.exception(e) + self.status = (self.Status.ERROR, 'polling thread could not start') + started_callback(self) print(formatExtendedStack()) - def __pollThread_inner(self): + def __pollThread_inner(self, started_callback): """super simple and super stupid per-module polling thread""" i = 0 + fastpoll = self.poll(i) + started_callback(self) while True: - fastpoll = self.poll(i) i += 1 try: time.sleep(self.pollinterval * (0.1 if fastpoll else 1)) except TypeError: time.sleep(min(self.pollinterval) if fastpoll else max(self.pollinterval)) + fastpoll = self.poll(i) def poll(self, nr=0): # Just poll all parameters regularly where polling is enabled diff --git a/secop/protocol/dispatcher.py b/secop/protocol/dispatcher.py index 4d2295b..8b70fc0 100644 --- a/secop/protocol/dispatcher.py +++ b/secop/protocol/dispatcher.py @@ -390,7 +390,7 @@ class Dispatcher(object): self._active_connections.add(conn) modules = self._modules - # for initial update poll all values... + # send updates for all values. The first poll already happend before the server is active for modulename in modules: moduleobj = self._modules.get(modulename, None) if moduleobj is None: @@ -399,22 +399,10 @@ class Dispatcher(object): for pname, pobj in moduleobj.parameters.items(): if not pobj.export: # XXX: handle export_as cases! continue - # WARNING: THIS READS ALL parameters FROM HW! - # XXX: should we send the cached values instead? (pbj.value) - # also: ignore errors here. - try: - res = self._getParameterValue(modulename, pname) - if res[0] == Ellipsis: # means we do not have a value at all so skip this - self.log.error( - u'activate: got no value for %s:%s!' % - (modulename, pname)) - #else: - #rm = Message(EVENTREPLY, u'%s:%s' % (modulename, pname)) - #rm.set_result(*res) - #self.broadcast_event(rm) - except SECOPError as e: - self.log.error(u'decide what to do here! (ignore error and skip update)') - self.log.exception(e) + # can not use announce_update here, as this will send to all clients + updmsg = Message(EVENTREPLY, module=moduleobj.name, parameter=pname) + updmsg.set_result(pobj.export_value(), dict(t=pobj.timestamp)) + conn.queue_async_reply(updmsg) msg.mkreply() conn.queue_async_reply(msg) # should be sent AFTER all the ^^initial updates return None diff --git a/secop/server.py b/secop/server.py index 3bbb33d..84de3b1 100644 --- a/secop/server.py +++ b/secop/server.py @@ -33,6 +33,11 @@ try: except ImportError: import ConfigParser as configparser # py2 +try: + from queue import Queue # py 3 +except ImportError: + from Queue import Queue # py 2 + from daemon import DaemonContext try: @@ -202,11 +207,23 @@ class Server(object): self._dispatcher.register_module(devobj, devname, export) # also call init on the modules devobj.init() - # call a possibly empty postinit on each module after registering all + # call postinit on each module after registering all for _devname, devobj, _export in devs: - postinit = getattr(devobj, u'postinit', None) - if postinit: - postinit() + devobj.postinit() + starting_modules = set() + finished_modules = Queue() + for _devname, devobj, _export in devs: + starting_modules.add(devobj) + devobj.late_init(started_callback=finished_modules.put) + # remark: it is the module implementors responsibility to call started_callback + # within reasonable time (using timeouts). If we find later, that this is not + # enough, we might insert checking for a timeout here, and somehow set the remaining + # starting_modules to an error state. + while starting_modules: + finished = finished_modules.get() + self.log.info(u'%s has started' % finished.name) + # use discard instead of remove here, catching the case when started_callback is called twice + starting_modules.discard(finished) def _processInterfaceOptions(self, interfaceopts): # eval interfaces