make startup faster in case of errors
When the io of one SECoP module fails, it takes ages to startup because each parameter poll takes the time to wait for a timeout. After the first communication error on an io, no more startup polls are tried on the modules using this io. Change-Id: I0d250953dfe91a7d68d2d2b108395cc25d471afe Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/28588 Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de> Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
parent
cd90385e6c
commit
8579368259
32
secop/io.py
32
secop/io.py
@ -71,14 +71,6 @@ class HasIO(Module):
|
|||||||
elif not io:
|
elif not io:
|
||||||
raise ConfigError("Module %s needs a value for either 'uri' or 'io'" % name)
|
raise ConfigError("Module %s needs a value for either 'uri' or 'io'" % name)
|
||||||
|
|
||||||
def initModule(self):
|
|
||||||
try:
|
|
||||||
self.io.read_is_connected()
|
|
||||||
except (CommunicationFailedError, AttributeError):
|
|
||||||
# AttributeError: read_is_connected is not required for an io object
|
|
||||||
pass
|
|
||||||
super().initModule()
|
|
||||||
|
|
||||||
def communicate(self, *args):
|
def communicate(self, *args):
|
||||||
return self.io.communicate(*args)
|
return self.io.communicate(*args)
|
||||||
|
|
||||||
@ -118,6 +110,7 @@ class IOBase(Communicator):
|
|||||||
_conn = None
|
_conn = None
|
||||||
_last_error = None
|
_last_error = None
|
||||||
_lock = None
|
_lock = None
|
||||||
|
_last_connect_attempt = 0
|
||||||
|
|
||||||
def earlyInit(self):
|
def earlyInit(self):
|
||||||
super().earlyInit()
|
super().earlyInit()
|
||||||
@ -169,6 +162,17 @@ class IOBase(Communicator):
|
|||||||
return False
|
return False
|
||||||
return self.read_is_connected()
|
return self.read_is_connected()
|
||||||
|
|
||||||
|
def check_connection(self):
|
||||||
|
"""called before communicate"""
|
||||||
|
if not self.is_connected:
|
||||||
|
now = time.time()
|
||||||
|
if now >= self._last_connect_attempt + self.pollinterval:
|
||||||
|
# we do not try to reconnect more often than pollinterval
|
||||||
|
_last_connect_attempt = now
|
||||||
|
if self.read_is_connected():
|
||||||
|
return
|
||||||
|
raise SilentError('disconnected') from None
|
||||||
|
|
||||||
def registerReconnectCallback(self, name, func):
|
def registerReconnectCallback(self, name, func):
|
||||||
"""register reconnect callback
|
"""register reconnect callback
|
||||||
|
|
||||||
@ -250,11 +254,7 @@ class StringIO(IOBase):
|
|||||||
wait_before is respected for end_of_lines within a command.
|
wait_before is respected for end_of_lines within a command.
|
||||||
"""
|
"""
|
||||||
command = command.encode(self.encoding)
|
command = command.encode(self.encoding)
|
||||||
if not self.is_connected:
|
self.check_connection()
|
||||||
# do not try to reconnect here
|
|
||||||
# read_is_connected is doing this when called by its poller
|
|
||||||
self.read_is_connected() # try to reconnect
|
|
||||||
raise SilentError('disconnected') from None
|
|
||||||
try:
|
try:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
# read garbage and wait before send
|
# read garbage and wait before send
|
||||||
@ -359,11 +359,7 @@ class BytesIO(IOBase):
|
|||||||
@Command((BLOBType(), IntRange(0)), result=BLOBType())
|
@Command((BLOBType(), IntRange(0)), result=BLOBType())
|
||||||
def communicate(self, request, replylen): # pylint: disable=arguments-differ
|
def communicate(self, request, replylen): # pylint: disable=arguments-differ
|
||||||
"""send a request and receive (at least) <replylen> bytes as reply"""
|
"""send a request and receive (at least) <replylen> bytes as reply"""
|
||||||
if not self.is_connected:
|
self.check_connection()
|
||||||
# do not try to reconnect here
|
|
||||||
# read_is_connected is doing this when called by its poller
|
|
||||||
self.read_is_connected() # try to reconnect
|
|
||||||
raise SilentError('disconnected') from None
|
|
||||||
try:
|
try:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
# read garbage and wait before send
|
# read garbage and wait before send
|
||||||
|
@ -30,7 +30,7 @@ from functools import wraps
|
|||||||
|
|
||||||
from secop.datatypes import ArrayOf, BoolType, EnumType, FloatRange, \
|
from secop.datatypes import ArrayOf, BoolType, EnumType, FloatRange, \
|
||||||
IntRange, StatusType, StringType, TextType, TupleOf, DiscouragedConversion
|
IntRange, StatusType, StringType, TextType, TupleOf, DiscouragedConversion
|
||||||
from secop.errors import BadValueError, ConfigError, \
|
from secop.errors import BadValueError, CommunicationFailedError, ConfigError, \
|
||||||
ProgrammingError, SECoPError, secop_error
|
ProgrammingError, SECoPError, secop_error
|
||||||
from secop.lib import formatException, mkthread, UniqueObject, generalConfig
|
from secop.lib import formatException, mkthread, UniqueObject, generalConfig
|
||||||
from secop.lib.enum import Enum
|
from secop.lib.enum import Enum
|
||||||
@ -620,7 +620,7 @@ class Module(HasAccessibles):
|
|||||||
self.pollInfo.interval = fast_interval if flag else self.pollinterval
|
self.pollInfo.interval = fast_interval if flag else self.pollinterval
|
||||||
self.pollInfo.trigger()
|
self.pollInfo.trigger()
|
||||||
|
|
||||||
def callPollFunc(self, rfunc):
|
def callPollFunc(self, rfunc, raise_com_failed=False):
|
||||||
"""call read method with proper error handling"""
|
"""call read method with proper error handling"""
|
||||||
try:
|
try:
|
||||||
rfunc()
|
rfunc()
|
||||||
@ -637,6 +637,8 @@ class Module(HasAccessibles):
|
|||||||
else:
|
else:
|
||||||
# uncatched error: this is more serious
|
# uncatched error: this is more serious
|
||||||
self.log.error('%s: %s', name, formatException())
|
self.log.error('%s: %s', name, formatException())
|
||||||
|
if raise_com_failed and isinstance(e, CommunicationFailedError):
|
||||||
|
raise
|
||||||
|
|
||||||
def __pollThread(self, modules, started_callback):
|
def __pollThread(self, modules, started_callback):
|
||||||
"""poll thread body
|
"""poll thread body
|
||||||
@ -661,7 +663,7 @@ class Module(HasAccessibles):
|
|||||||
trg.set()
|
trg.set()
|
||||||
self.registerReconnectCallback('trigger_polls', trigger_all)
|
self.registerReconnectCallback('trigger_polls', trigger_all)
|
||||||
|
|
||||||
# collect and call all read functions a first time
|
# collect all read functions
|
||||||
for mobj in modules:
|
for mobj in modules:
|
||||||
pinfo = mobj.pollInfo = PollInfo(mobj.pollinterval, self.triggerPoll)
|
pinfo = mobj.pollInfo = PollInfo(mobj.pollinterval, self.triggerPoll)
|
||||||
# trigger a poll interval change when self.pollinterval changes.
|
# trigger a poll interval change when self.pollinterval changes.
|
||||||
@ -672,7 +674,16 @@ class Module(HasAccessibles):
|
|||||||
rfunc = getattr(mobj, 'read_' + pname)
|
rfunc = getattr(mobj, 'read_' + pname)
|
||||||
if rfunc.poll:
|
if rfunc.poll:
|
||||||
pinfo.polled_parameters.append((mobj, rfunc, pobj))
|
pinfo.polled_parameters.append((mobj, rfunc, pobj))
|
||||||
mobj.callPollFunc(rfunc)
|
# call all read functions a first time
|
||||||
|
try:
|
||||||
|
for m in modules:
|
||||||
|
for mobj, rfunc, _ in m.pollInfo.polled_parameters:
|
||||||
|
mobj.callPollFunc(rfunc, raise_com_failed=True)
|
||||||
|
except CommunicationFailedError as e:
|
||||||
|
# when communication failed, probably all parameters and may be more modules are affected.
|
||||||
|
# as this would take a lot of time (summed up timeouts), we do not continue
|
||||||
|
# trying and let the server accept connections, further polls might success later
|
||||||
|
self.log.error('communication failure on startup: %s', e)
|
||||||
started_callback()
|
started_callback()
|
||||||
to_poll = ()
|
to_poll = ()
|
||||||
while True:
|
while True:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user