better behaviour on startup in case of errors

- fix a bug then TcpServer can not start dye to address in use
- report errors when restarting interfaces
- increase timeout. the timeout for waiting all interfaces
  starting up must be higher than a potential successful
  startup of TcpServer, which is currently ~ 10 sec
  (might be reduced, but at both places)

Change-Id: I88b967c4baff79fdf94f4c849dd713d2cba6fabc
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33985
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
This commit is contained in:
zolliker 2024-06-19 11:48:30 +02:00
parent 986065e1f6
commit fa37b43be2
2 changed files with 44 additions and 26 deletions

View File

@ -159,7 +159,8 @@ class TCPServer(DualStackTCPServer):
self.detailed_errors = options.pop('detailed_errors', False)
self.log.info("TCPServer %s binding to port %d", name, port)
for ntry in range(5):
maxtry = 5
for ntry in range(maxtry):
try:
DualStackTCPServer.__init__(
self, ('', port), TCPRequestHandler,
@ -167,8 +168,8 @@ class TCPServer(DualStackTCPServer):
)
break
except OSError as e:
if e.args[0] == errno.EADDRINUSE: # address already in use
# this may happen despite of allow_reuse_address
if ntry < maxtry - 1 and e.args[0] == errno.EADDRINUSE: # address already in use
# this may happen after restarting for a short time even with allow_reuse_address
time.sleep(0.3 * (1 << ntry)) # max accumulated sleep time: 0.3 * 31 = 9.3 sec
else:
self.log.error('could not initialize TCP Server: %r', e)

View File

@ -156,38 +156,49 @@ class Server:
print(formatException(verbose=True))
raise
self.interfaces = []
self.interfaces = {}
iface_threads = []
interfaces_started = MultiEvent(default_timeout=1)#default_timeout=15)
# default_timeout 12 sec: TCPServer might need up to 10 sec to wait for Address no longer in use
interfaces_started = MultiEvent(default_timeout=12)
lock = threading.Lock()
failed = {}
interfaces = [self.node_cfg['interface']] + self.node_cfg.get('secondary', [])
# TODO: check if only one interface of each type is open?
for interface in [self.node_cfg['interface']] + self.node_cfg.get(
'secondary', []
):
for interface in interfaces:
opts = {'uri': interface}
t = mkthread(
self._interfaceThread,
opts,
lock,
self.interfaces.append,
failed,
interfaces_started.get_trigger(),
)
iface_threads.append(t)
interfaces_started.wait()
self.log.info('startup done, handling transport messages')
if not interfaces_started.wait():
for iface in interfaces:
if iface not in failed and iface not in self.interfaces:
self.log.error('timeout starting interface %s', iface)
while failed:
iface, err = failed.popitem()
self.log.error('starting interface %s failed with %r', iface, err)
if not self.interfaces:
self.log.error('no interface started')
return
self.log.info('startup done with interface(s) %s' % ', '.join(self.interfaces))
if systemd:
systemd.daemon.notify("READY=1\nSTATUS=accepting requests")
self.log.info('Started %d interfaces' % len(self.interfaces))
# we wait here on the thread finishing, which means we got a
# signal to shut down or an exception was raised
# TODO: get the exception (and re-raise?)
for t in iface_threads:
t.join()
self.log.info(f'stopped listenning, cleaning up'
f' {len(self.secnode.modules)} modules')
while failed:
iface, err = failed.popitem()
self.log.error('interface %s failed with %r', iface, err)
self.log.info('stopped listening, cleaning up %d modules',
len(self.secnode.modules))
# if systemd:
# if self._restart:
# systemd.daemon.notify('RELOADING=1')
@ -202,27 +213,33 @@ class Server:
def restart(self):
if not self._restart:
self._restart = True
for iface in self.interfaces:
for iface in self.interfaces.values():
iface.shutdown()
def shutdown(self):
self._restart = False
for iface in self.interfaces:
for iface in self.interfaces.values():
iface.shutdown()
def _interfaceThread(self, opts, lock, if_cb, start_cb):
scheme, _, _ = opts['uri'].rpartition('://')
def _interfaceThread(self, opts, lock, failed, start_cb):
iface = opts['uri']
scheme, _, _ = iface.rpartition('://')
scheme = scheme or 'tcp'
cls = get_class(self.INTERFACES[scheme])
with cls(scheme, self.log.getChild(scheme), opts, self) as interface:
if opts:
raise ConfigError(self.unknown_options(cls, opts))
try:
with cls(scheme, self.log.getChild(scheme), opts, self) as interface:
if opts:
raise ConfigError(self.unknown_options(cls, opts))
with lock:
self.interfaces[iface] = interface
start_cb()
interface.serve_forever()
# server_close() called by 'with'
except Exception as e:
with lock:
if_cb(interface)
failed[iface] = e
start_cb()
interface.serve_forever()
# server_close() called by 'with'
return
self.log.info(f'stopped {iface}')
def _processCfg(self):