From de5f17695cea15278ec12691ae4b5305a732528e Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Thu, 2 Jun 2022 10:03:38 +0200 Subject: [PATCH] make startup faster in case of errors When the io of one SECoP module fails, it takes ages to startup because each parameter poll takes the time to wait for a timeout. After the first communication error on an io, no more startup polls are tried on the modules using this io. Change-Id: I0d250953dfe91a7d68d2d2b108395cc25d471afe Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/28588 Tested-by: Jenkins Automated Tests Reviewed-by: Enrico Faulhaber Reviewed-by: Markus Zolliker --- secop/io.py | 32 ++++++++++++++------------------ secop/modules.py | 19 +++++++++++++++---- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/secop/io.py b/secop/io.py index 0ff9359..a5431c3 100644 --- a/secop/io.py +++ b/secop/io.py @@ -71,14 +71,6 @@ class HasIO(Module): elif not io: raise ConfigError("Module %s needs a value for either 'uri' or 'io'" % name) - def initModule(self): - try: - self.io.read_is_connected() - except (CommunicationFailedError, AttributeError): - # AttributeError: read_is_connected is not required for an io object - pass - super().initModule() - def communicate(self, *args): return self.io.communicate(*args) @@ -118,6 +110,7 @@ class IOBase(Communicator): _conn = None _last_error = None _lock = None + _last_connect_attempt = 0 def earlyInit(self): super().earlyInit() @@ -169,6 +162,17 @@ class IOBase(Communicator): return False return self.read_is_connected() + def check_connection(self): + """called before communicate""" + if not self.is_connected: + now = time.time() + if now >= self._last_connect_attempt + self.pollinterval: + # we do not try to reconnect more often than pollinterval + _last_connect_attempt = now + if self.read_is_connected(): + return + raise SilentError('disconnected') from None + def registerReconnectCallback(self, name, func): """register reconnect callback @@ -250,11 +254,7 @@ class StringIO(IOBase): wait_before is respected for end_of_lines within a command. """ command = command.encode(self.encoding) - if not self.is_connected: - # do not try to reconnect here - # read_is_connected is doing this when called by its poller - self.read_is_connected() # try to reconnect - raise SilentError('disconnected') from None + self.check_connection() try: with self._lock: # read garbage and wait before send @@ -359,11 +359,7 @@ class BytesIO(IOBase): @Command((BLOBType(), IntRange(0)), result=BLOBType()) def communicate(self, request, replylen): # pylint: disable=arguments-differ """send a request and receive (at least) bytes as reply""" - if not self.is_connected: - # do not try to reconnect here - # read_is_connected is doing this when called by its poller - self.read_is_connected() # try to reconnect - raise SilentError('disconnected') from None + self.check_connection() try: with self._lock: # read garbage and wait before send diff --git a/secop/modules.py b/secop/modules.py index 827740b..db6d296 100644 --- a/secop/modules.py +++ b/secop/modules.py @@ -30,7 +30,7 @@ from functools import wraps from secop.datatypes import ArrayOf, BoolType, EnumType, FloatRange, \ IntRange, StatusType, StringType, TextType, TupleOf, DiscouragedConversion -from secop.errors import BadValueError, ConfigError, \ +from secop.errors import BadValueError, CommunicationFailedError, ConfigError, \ ProgrammingError, SECoPError, secop_error from secop.lib import formatException, mkthread, UniqueObject, generalConfig from secop.lib.enum import Enum @@ -641,7 +641,7 @@ class Module(HasAccessibles): self.pollInfo.interval = fast_interval if flag else self.pollinterval self.pollInfo.trigger() - def callPollFunc(self, rfunc): + def callPollFunc(self, rfunc, raise_com_failed=False): """call read method with proper error handling""" try: rfunc() @@ -658,6 +658,8 @@ class Module(HasAccessibles): else: # uncatched error: this is more serious self.log.error('%s: %s', name, formatException()) + if raise_com_failed and isinstance(e, CommunicationFailedError): + raise def __pollThread(self, modules, started_callback): """poll thread body @@ -682,7 +684,7 @@ class Module(HasAccessibles): trg.set() self.registerReconnectCallback('trigger_polls', trigger_all) - # collect and call all read functions a first time + # collect all read functions for mobj in modules: pinfo = mobj.pollInfo = PollInfo(mobj.pollinterval, self.triggerPoll) # trigger a poll interval change when self.pollinterval changes. @@ -693,7 +695,16 @@ class Module(HasAccessibles): rfunc = getattr(mobj, 'read_' + pname) if rfunc.poll: pinfo.polled_parameters.append((mobj, rfunc, pobj)) - mobj.callPollFunc(rfunc) + # call all read functions a first time + try: + for m in modules: + for mobj, rfunc, _ in m.pollInfo.polled_parameters: + mobj.callPollFunc(rfunc, raise_com_failed=True) + except CommunicationFailedError as e: + # when communication failed, probably all parameters and may be more modules are affected. + # as this would take a lot of time (summed up timeouts), we do not continue + # trying and let the server accept connections, further polls might success later + self.log.error('communication failure on startup: %s', e) started_callback() to_poll = () while True: