diff --git a/frappy/protocol/interface/tcp.py b/frappy/protocol/interface/tcp.py index 844b999..63ca2b9 100644 --- a/frappy/protocol/interface/tcp.py +++ b/frappy/protocol/interface/tcp.py @@ -159,7 +159,8 @@ class TCPServer(DualStackTCPServer): self.detailed_errors = options.pop('detailed_errors', False) self.log.info("TCPServer %s binding to port %d", name, port) - for ntry in range(5): + maxtry = 5 + for ntry in range(maxtry): try: DualStackTCPServer.__init__( self, ('', port), TCPRequestHandler, @@ -167,8 +168,8 @@ class TCPServer(DualStackTCPServer): ) break except OSError as e: - if e.args[0] == errno.EADDRINUSE: # address already in use - # this may happen despite of allow_reuse_address + if ntry < maxtry - 1 and e.args[0] == errno.EADDRINUSE: # address already in use + # this may happen after restarting for a short time even with allow_reuse_address time.sleep(0.3 * (1 << ntry)) # max accumulated sleep time: 0.3 * 31 = 9.3 sec else: self.log.error('could not initialize TCP Server: %r', e) diff --git a/frappy/server.py b/frappy/server.py index 2bf84f5..f00599a 100644 --- a/frappy/server.py +++ b/frappy/server.py @@ -156,38 +156,49 @@ class Server: print(formatException(verbose=True)) raise - self.interfaces = [] + self.interfaces = {} iface_threads = [] - interfaces_started = MultiEvent(default_timeout=1)#default_timeout=15) + # default_timeout 12 sec: TCPServer might need up to 10 sec to wait for Address no longer in use + interfaces_started = MultiEvent(default_timeout=12) lock = threading.Lock() + failed = {} + interfaces = [self.node_cfg['interface']] + self.node_cfg.get('secondary', []) # TODO: check if only one interface of each type is open? - for interface in [self.node_cfg['interface']] + self.node_cfg.get( - 'secondary', [] - ): + for interface in interfaces: opts = {'uri': interface} t = mkthread( self._interfaceThread, opts, lock, - self.interfaces.append, + failed, interfaces_started.get_trigger(), ) iface_threads.append(t) - interfaces_started.wait() - - self.log.info('startup done, handling transport messages') + if not interfaces_started.wait(): + for iface in interfaces: + if iface not in failed and iface not in self.interfaces: + self.log.error('timeout starting interface %s', iface) + while failed: + iface, err = failed.popitem() + self.log.error('starting interface %s failed with %r', iface, err) + if not self.interfaces: + self.log.error('no interface started') + return + self.log.info('startup done with interface(s) %s' % ', '.join(self.interfaces)) if systemd: systemd.daemon.notify("READY=1\nSTATUS=accepting requests") - self.log.info('Started %d interfaces' % len(self.interfaces)) # we wait here on the thread finishing, which means we got a # signal to shut down or an exception was raised - # TODO: get the exception (and re-raise?) for t in iface_threads: t.join() - self.log.info(f'stopped listenning, cleaning up' - f' {len(self.secnode.modules)} modules') + while failed: + iface, err = failed.popitem() + self.log.error('interface %s failed with %r', iface, err) + + self.log.info('stopped listening, cleaning up %d modules', + len(self.secnode.modules)) # if systemd: # if self._restart: # systemd.daemon.notify('RELOADING=1') @@ -202,27 +213,33 @@ class Server: def restart(self): if not self._restart: self._restart = True - for iface in self.interfaces: + for iface in self.interfaces.values(): iface.shutdown() def shutdown(self): self._restart = False - for iface in self.interfaces: + for iface in self.interfaces.values(): iface.shutdown() - def _interfaceThread(self, opts, lock, if_cb, start_cb): - scheme, _, _ = opts['uri'].rpartition('://') + def _interfaceThread(self, opts, lock, failed, start_cb): iface = opts['uri'] + scheme, _, _ = iface.rpartition('://') scheme = scheme or 'tcp' cls = get_class(self.INTERFACES[scheme]) - with cls(scheme, self.log.getChild(scheme), opts, self) as interface: - if opts: - raise ConfigError(self.unknown_options(cls, opts)) + try: + with cls(scheme, self.log.getChild(scheme), opts, self) as interface: + if opts: + raise ConfigError(self.unknown_options(cls, opts)) + with lock: + self.interfaces[iface] = interface + start_cb() + interface.serve_forever() + # server_close() called by 'with' + except Exception as e: with lock: - if_cb(interface) + failed[iface] = e start_cb() - interface.serve_forever() - # server_close() called by 'with' + return self.log.info(f'stopped {iface}') def _processCfg(self):