From b66acd4d7330847df76dbfc3f792672991757b01 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Mon, 17 Mar 2025 09:35:11 +0100 Subject: [PATCH] stop poller threads on shutdown make sure module methods are not called after shutdownModule + fix: when mod.enablePoll is False, pollInfo is None therefore we have to check before access Change-Id: I83b28607b25996376939175be8abf0c5b27bcac1 --- frappy/modulebase.py | 46 ++++++++++++++++++++++++++++++-------------- frappy/secnode.py | 10 ++++++++++ 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/frappy/modulebase.py b/frappy/modulebase.py index 260a152..ca0a7e2 100644 --- a/frappy/modulebase.py +++ b/frappy/modulebase.py @@ -323,6 +323,7 @@ class Module(HasAccessibles): pollInfo = None triggerPoll = None # trigger event for polls. used on io modules and modules without io + __poller = None # the poller thread, if used def __init__(self, name, logger, cfgdict, srv): # remember the secnode for interacting with other modules and the @@ -525,9 +526,7 @@ class Module(HasAccessibles): if validate: value = pobj.datatype(value) except Exception as e: - # allow to assign an exception to trigger an error_update message - err = value if isinstance(value, Exception) else e - changed = False + err = e else: changed = pobj.value != value or pobj.readerror # store the value even in case of error @@ -614,7 +613,7 @@ class Module(HasAccessibles): # we do not need self.errors any longer. should we delete it? # del self.errors if self.polledModules: - mkthread(self.__pollThread, self.polledModules, start_events.get_trigger()) + self.__poller = mkthread(self.__pollThread, self.polledModules, start_events.get_trigger()) self.startModuleDone = True def initialReads(self): @@ -627,6 +626,27 @@ class Module(HasAccessibles): all parameters are polled once """ + def stopPollThread(self): + """trigger the poll thread to stop + + this is called on shutdown + """ + if self.__poller: + self.polledModules.clear() + self.triggerPoll.set() + + def joinPollThread(self, timeout): + """wait for poll thread to finish + + if the wait time exceeds seconds, return and log a warning + """ + if self.__poller: + self.stopPollThread() + if timeout > 0: + self.__poller.join(timeout) + if self.__poller.is_alive(): + self.log.warning('can not stop poller') + def shutdownModule(self): """called when the sever shuts down @@ -660,13 +680,10 @@ class Module(HasAccessibles): self.pollInfo.pending_errors.discard(rfunc.__name__) except Exception as e: if getattr(e, 'report_error', True): - self.log.debug('error in %r', rfunc) name = rfunc.__name__ self.pollInfo.pending_errors.add(name) # trigger o.k. message after error is resolved if isinstance(e, SECoPError): - if name == 'doPoll': - # otherwise the method is already appended in rfunc - e.raising_methods.append(f'{self.name}.{name}') + e.raising_methods.append(name) if e.silent: self.log.debug('%s', e.format(False)) else: @@ -674,7 +691,7 @@ class Module(HasAccessibles): if raise_com_failed and isinstance(e, CommunicationFailedError): raise else: - # not a SECoPError: this is probably a programming error + # not a SECoPError: this is proabably a programming error # we want to log the traceback self.log.error('%s', formatException()) @@ -734,13 +751,14 @@ class Module(HasAccessibles): if not polled_modules: # no polls needed - exit thread return to_poll = () - while True: + while modules: # modules will be cleared on shutdown now = time.time() wait_time = 999 for mobj in modules: pinfo = mobj.pollInfo - wait_time = min(pinfo.last_main + pinfo.interval - now, wait_time, - pinfo.last_slow + mobj.slowinterval - now) + if pinfo: + wait_time = min(pinfo.last_main + pinfo.interval - now, wait_time, + pinfo.last_slow + mobj.slowinterval - now) if wait_time > 0 and not to_poll: # nothing to do self.triggerPoll.wait(wait_time) @@ -749,7 +767,7 @@ class Module(HasAccessibles): # call doPoll of all modules where due for mobj in modules: pinfo = mobj.pollInfo - if now > pinfo.last_main + pinfo.interval: + if pinfo and now > pinfo.last_main + pinfo.interval: try: pinfo.last_main = (now // pinfo.interval) * pinfo.interval except ZeroDivisionError: @@ -769,7 +787,7 @@ class Module(HasAccessibles): # collect due slow polls for mobj in modules: pinfo = mobj.pollInfo - if now > pinfo.last_slow + mobj.slowinterval: + if pinfo and now > pinfo.last_slow + mobj.slowinterval: to_poll.extend(pinfo.polled_parameters) pinfo.last_slow = (now // mobj.slowinterval) * mobj.slowinterval if to_poll: diff --git a/frappy/secnode.py b/frappy/secnode.py index 9489923..86fc9c8 100644 --- a/frappy/secnode.py +++ b/frappy/secnode.py @@ -19,6 +19,7 @@ # # ***************************************************************************** +import time import traceback from collections import OrderedDict @@ -255,6 +256,15 @@ class SecNode: def shutdown_modules(self): """Call 'shutdownModule' for all modules.""" + # stop pollers + for name in self.modules: + self.modules[name].stopPollThread() + # do not yet join here, as we want to wait in parallel + now = time.time() + deadline = now + 5 + for name in self.modules: + self.modules[name].joinPollThread(now - deadline) + now = time.time() for name in self._getSortedModules(): self.modules[name].shutdownModule()