stop poller threads on shutdown
make sure module methods are not called after shutdownModule + fix: when mod.enablePoll is False, pollInfo is None therefore we have to check before access Change-Id: I83b28607b25996376939175be8abf0c5b27bcac1
This commit is contained in:
parent
da24c6244e
commit
b66acd4d73
@ -323,6 +323,7 @@ class Module(HasAccessibles):
|
||||
|
||||
pollInfo = None
|
||||
triggerPoll = None # trigger event for polls. used on io modules and modules without io
|
||||
__poller = None # the poller thread, if used
|
||||
|
||||
def __init__(self, name, logger, cfgdict, srv):
|
||||
# remember the secnode for interacting with other modules and the
|
||||
@ -525,9 +526,7 @@ class Module(HasAccessibles):
|
||||
if validate:
|
||||
value = pobj.datatype(value)
|
||||
except Exception as e:
|
||||
# allow to assign an exception to trigger an error_update message
|
||||
err = value if isinstance(value, Exception) else e
|
||||
changed = False
|
||||
err = e
|
||||
else:
|
||||
changed = pobj.value != value or pobj.readerror
|
||||
# store the value even in case of error
|
||||
@ -614,7 +613,7 @@ class Module(HasAccessibles):
|
||||
# we do not need self.errors any longer. should we delete it?
|
||||
# del self.errors
|
||||
if self.polledModules:
|
||||
mkthread(self.__pollThread, self.polledModules, start_events.get_trigger())
|
||||
self.__poller = mkthread(self.__pollThread, self.polledModules, start_events.get_trigger())
|
||||
self.startModuleDone = True
|
||||
|
||||
def initialReads(self):
|
||||
@ -627,6 +626,27 @@ class Module(HasAccessibles):
|
||||
all parameters are polled once
|
||||
"""
|
||||
|
||||
def stopPollThread(self):
|
||||
"""trigger the poll thread to stop
|
||||
|
||||
this is called on shutdown
|
||||
"""
|
||||
if self.__poller:
|
||||
self.polledModules.clear()
|
||||
self.triggerPoll.set()
|
||||
|
||||
def joinPollThread(self, timeout):
|
||||
"""wait for poll thread to finish
|
||||
|
||||
if the wait time exceeds <timeout> seconds, return and log a warning
|
||||
"""
|
||||
if self.__poller:
|
||||
self.stopPollThread()
|
||||
if timeout > 0:
|
||||
self.__poller.join(timeout)
|
||||
if self.__poller.is_alive():
|
||||
self.log.warning('can not stop poller')
|
||||
|
||||
def shutdownModule(self):
|
||||
"""called when the sever shuts down
|
||||
|
||||
@ -660,13 +680,10 @@ class Module(HasAccessibles):
|
||||
self.pollInfo.pending_errors.discard(rfunc.__name__)
|
||||
except Exception as e:
|
||||
if getattr(e, 'report_error', True):
|
||||
self.log.debug('error in %r', rfunc)
|
||||
name = rfunc.__name__
|
||||
self.pollInfo.pending_errors.add(name) # trigger o.k. message after error is resolved
|
||||
if isinstance(e, SECoPError):
|
||||
if name == 'doPoll':
|
||||
# otherwise the method is already appended in rfunc
|
||||
e.raising_methods.append(f'{self.name}.{name}')
|
||||
e.raising_methods.append(name)
|
||||
if e.silent:
|
||||
self.log.debug('%s', e.format(False))
|
||||
else:
|
||||
@ -674,7 +691,7 @@ class Module(HasAccessibles):
|
||||
if raise_com_failed and isinstance(e, CommunicationFailedError):
|
||||
raise
|
||||
else:
|
||||
# not a SECoPError: this is probably a programming error
|
||||
# not a SECoPError: this is proabably a programming error
|
||||
# we want to log the traceback
|
||||
self.log.error('%s', formatException())
|
||||
|
||||
@ -734,13 +751,14 @@ class Module(HasAccessibles):
|
||||
if not polled_modules: # no polls needed - exit thread
|
||||
return
|
||||
to_poll = ()
|
||||
while True:
|
||||
while modules: # modules will be cleared on shutdown
|
||||
now = time.time()
|
||||
wait_time = 999
|
||||
for mobj in modules:
|
||||
pinfo = mobj.pollInfo
|
||||
wait_time = min(pinfo.last_main + pinfo.interval - now, wait_time,
|
||||
pinfo.last_slow + mobj.slowinterval - now)
|
||||
if pinfo:
|
||||
wait_time = min(pinfo.last_main + pinfo.interval - now, wait_time,
|
||||
pinfo.last_slow + mobj.slowinterval - now)
|
||||
if wait_time > 0 and not to_poll:
|
||||
# nothing to do
|
||||
self.triggerPoll.wait(wait_time)
|
||||
@ -749,7 +767,7 @@ class Module(HasAccessibles):
|
||||
# call doPoll of all modules where due
|
||||
for mobj in modules:
|
||||
pinfo = mobj.pollInfo
|
||||
if now > pinfo.last_main + pinfo.interval:
|
||||
if pinfo and now > pinfo.last_main + pinfo.interval:
|
||||
try:
|
||||
pinfo.last_main = (now // pinfo.interval) * pinfo.interval
|
||||
except ZeroDivisionError:
|
||||
@ -769,7 +787,7 @@ class Module(HasAccessibles):
|
||||
# collect due slow polls
|
||||
for mobj in modules:
|
||||
pinfo = mobj.pollInfo
|
||||
if now > pinfo.last_slow + mobj.slowinterval:
|
||||
if pinfo and now > pinfo.last_slow + mobj.slowinterval:
|
||||
to_poll.extend(pinfo.polled_parameters)
|
||||
pinfo.last_slow = (now // mobj.slowinterval) * mobj.slowinterval
|
||||
if to_poll:
|
||||
|
@ -19,6 +19,7 @@
|
||||
#
|
||||
# *****************************************************************************
|
||||
|
||||
import time
|
||||
import traceback
|
||||
from collections import OrderedDict
|
||||
|
||||
@ -255,6 +256,15 @@ class SecNode:
|
||||
|
||||
def shutdown_modules(self):
|
||||
"""Call 'shutdownModule' for all modules."""
|
||||
# stop pollers
|
||||
for name in self.modules:
|
||||
self.modules[name].stopPollThread()
|
||||
# do not yet join here, as we want to wait in parallel
|
||||
now = time.time()
|
||||
deadline = now + 5
|
||||
for name in self.modules:
|
||||
self.modules[name].joinPollThread(now - deadline)
|
||||
now = time.time()
|
||||
for name in self._getSortedModules():
|
||||
self.modules[name].shutdownModule()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user