read parameters from hardware before starting up server
+ send cached values on activate instead of reading from hardware Change-Id: Ied9b29969cef3584a788fcd51d5b31946b0a3f25 Reviewed-on: https://forge.frm2.tum.de/review/18234 Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
@ -177,11 +177,19 @@ class Module(object):
|
|||||||
def init(self):
|
def init(self):
|
||||||
# may be overriden in derived classes to init stuff
|
# may be overriden in derived classes to init stuff
|
||||||
self.log.debug('empty init()')
|
self.log.debug('empty init()')
|
||||||
mkthread(self.late_init)
|
|
||||||
|
|
||||||
def late_init(self):
|
def postinit(self):
|
||||||
# this runs async somewhen after init
|
self.log.debug('empty postinit()')
|
||||||
self.log.debug('late init()')
|
|
||||||
|
def late_init(self, started_callback):
|
||||||
|
'''runs after postinit of all modules
|
||||||
|
|
||||||
|
started_callback to be called when thread spawned by late_init
|
||||||
|
or, if not implmemented, immediately
|
||||||
|
'''
|
||||||
|
|
||||||
|
self.log.debug('empty late init()')
|
||||||
|
started_callback(self)
|
||||||
|
|
||||||
|
|
||||||
class Readable(Module):
|
class Readable(Module):
|
||||||
@ -217,26 +225,33 @@ class Readable(Module):
|
|||||||
|
|
||||||
def init(self):
|
def init(self):
|
||||||
Module.init(self)
|
Module.init(self)
|
||||||
self._pollthread = mkthread(self.__pollThread)
|
|
||||||
|
|
||||||
def __pollThread(self):
|
def late_init(self, started_callback):
|
||||||
|
'''start polling thread'''
|
||||||
|
mkthread(self.__pollThread, started_callback)
|
||||||
|
|
||||||
|
def __pollThread(self, started_callback):
|
||||||
try:
|
try:
|
||||||
self.__pollThread_inner()
|
self.__pollThread_inner(started_callback)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.exception(e)
|
self.log.exception(e)
|
||||||
|
self.status = (self.Status.ERROR, 'polling thread could not start')
|
||||||
|
started_callback(self)
|
||||||
print(formatExtendedStack())
|
print(formatExtendedStack())
|
||||||
|
|
||||||
def __pollThread_inner(self):
|
def __pollThread_inner(self, started_callback):
|
||||||
"""super simple and super stupid per-module polling thread"""
|
"""super simple and super stupid per-module polling thread"""
|
||||||
i = 0
|
i = 0
|
||||||
|
fastpoll = self.poll(i)
|
||||||
|
started_callback(self)
|
||||||
while True:
|
while True:
|
||||||
fastpoll = self.poll(i)
|
|
||||||
i += 1
|
i += 1
|
||||||
try:
|
try:
|
||||||
time.sleep(self.pollinterval * (0.1 if fastpoll else 1))
|
time.sleep(self.pollinterval * (0.1 if fastpoll else 1))
|
||||||
except TypeError:
|
except TypeError:
|
||||||
time.sleep(min(self.pollinterval)
|
time.sleep(min(self.pollinterval)
|
||||||
if fastpoll else max(self.pollinterval))
|
if fastpoll else max(self.pollinterval))
|
||||||
|
fastpoll = self.poll(i)
|
||||||
|
|
||||||
def poll(self, nr=0):
|
def poll(self, nr=0):
|
||||||
# Just poll all parameters regularly where polling is enabled
|
# Just poll all parameters regularly where polling is enabled
|
||||||
|
@ -390,7 +390,7 @@ class Dispatcher(object):
|
|||||||
self._active_connections.add(conn)
|
self._active_connections.add(conn)
|
||||||
modules = self._modules
|
modules = self._modules
|
||||||
|
|
||||||
# for initial update poll all values...
|
# send updates for all values. The first poll already happend before the server is active
|
||||||
for modulename in modules:
|
for modulename in modules:
|
||||||
moduleobj = self._modules.get(modulename, None)
|
moduleobj = self._modules.get(modulename, None)
|
||||||
if moduleobj is None:
|
if moduleobj is None:
|
||||||
@ -399,22 +399,10 @@ class Dispatcher(object):
|
|||||||
for pname, pobj in moduleobj.parameters.items():
|
for pname, pobj in moduleobj.parameters.items():
|
||||||
if not pobj.export: # XXX: handle export_as cases!
|
if not pobj.export: # XXX: handle export_as cases!
|
||||||
continue
|
continue
|
||||||
# WARNING: THIS READS ALL parameters FROM HW!
|
# can not use announce_update here, as this will send to all clients
|
||||||
# XXX: should we send the cached values instead? (pbj.value)
|
updmsg = Message(EVENTREPLY, module=moduleobj.name, parameter=pname)
|
||||||
# also: ignore errors here.
|
updmsg.set_result(pobj.export_value(), dict(t=pobj.timestamp))
|
||||||
try:
|
conn.queue_async_reply(updmsg)
|
||||||
res = self._getParameterValue(modulename, pname)
|
|
||||||
if res[0] == Ellipsis: # means we do not have a value at all so skip this
|
|
||||||
self.log.error(
|
|
||||||
u'activate: got no value for %s:%s!' %
|
|
||||||
(modulename, pname))
|
|
||||||
#else:
|
|
||||||
#rm = Message(EVENTREPLY, u'%s:%s' % (modulename, pname))
|
|
||||||
#rm.set_result(*res)
|
|
||||||
#self.broadcast_event(rm)
|
|
||||||
except SECOPError as e:
|
|
||||||
self.log.error(u'decide what to do here! (ignore error and skip update)')
|
|
||||||
self.log.exception(e)
|
|
||||||
msg.mkreply()
|
msg.mkreply()
|
||||||
conn.queue_async_reply(msg) # should be sent AFTER all the ^^initial updates
|
conn.queue_async_reply(msg) # should be sent AFTER all the ^^initial updates
|
||||||
return None
|
return None
|
||||||
|
@ -33,6 +33,11 @@ try:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
import ConfigParser as configparser # py2
|
import ConfigParser as configparser # py2
|
||||||
|
|
||||||
|
try:
|
||||||
|
from queue import Queue # py 3
|
||||||
|
except ImportError:
|
||||||
|
from Queue import Queue # py 2
|
||||||
|
|
||||||
from daemon import DaemonContext
|
from daemon import DaemonContext
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -202,11 +207,23 @@ class Server(object):
|
|||||||
self._dispatcher.register_module(devobj, devname, export)
|
self._dispatcher.register_module(devobj, devname, export)
|
||||||
# also call init on the modules
|
# also call init on the modules
|
||||||
devobj.init()
|
devobj.init()
|
||||||
# call a possibly empty postinit on each module after registering all
|
# call postinit on each module after registering all
|
||||||
for _devname, devobj, _export in devs:
|
for _devname, devobj, _export in devs:
|
||||||
postinit = getattr(devobj, u'postinit', None)
|
devobj.postinit()
|
||||||
if postinit:
|
starting_modules = set()
|
||||||
postinit()
|
finished_modules = Queue()
|
||||||
|
for _devname, devobj, _export in devs:
|
||||||
|
starting_modules.add(devobj)
|
||||||
|
devobj.late_init(started_callback=finished_modules.put)
|
||||||
|
# remark: it is the module implementors responsibility to call started_callback
|
||||||
|
# within reasonable time (using timeouts). If we find later, that this is not
|
||||||
|
# enough, we might insert checking for a timeout here, and somehow set the remaining
|
||||||
|
# starting_modules to an error state.
|
||||||
|
while starting_modules:
|
||||||
|
finished = finished_modules.get()
|
||||||
|
self.log.info(u'%s has started' % finished.name)
|
||||||
|
# use discard instead of remove here, catching the case when started_callback is called twice
|
||||||
|
starting_modules.discard(finished)
|
||||||
|
|
||||||
def _processInterfaceOptions(self, interfaceopts):
|
def _processInterfaceOptions(self, interfaceopts):
|
||||||
# eval interfaces
|
# eval interfaces
|
||||||
|
Reference in New Issue
Block a user