From 34793f8bd0a06dcffeb6cb39bb1e641736d2eb50 Mon Sep 17 00:00:00 2001 From: Alexander Zaft Date: Tue, 7 Nov 2023 11:03:16 +0100 Subject: [PATCH] core: allow multiple interfaces Change-Id: Ib8c0baef85a6dd69cddafe1c4973e42136d1588b Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/32489 Reviewed-by: Markus Zolliker Reviewed-by: Alexander Zaft Tested-by: Jenkins Automated Tests --- frappy/server.py | 67 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/frappy/server.py b/frappy/server.py index 36aea2a..d524ab6 100644 --- a/frappy/server.py +++ b/frappy/server.py @@ -25,6 +25,7 @@ import os import signal import sys +import threading from frappy.config import load_config from frappy.errors import ConfigError @@ -107,8 +108,8 @@ class Server: signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) - def signal_handler(self, _num, _frame): - if hasattr(self, 'interface') and self.interface: + def signal_handler(self, num, frame): + if hasattr(self, 'interfaces') and self.interfaces: self.shutdown() def start(self): @@ -151,23 +152,32 @@ class Server: print(formatException(verbose=True)) raise - opts = {'uri': self.node_cfg['interface']} - scheme, _, _ = opts['uri'].rpartition('://') - scheme = scheme or 'tcp' - cls = get_class(self.INTERFACES[scheme]) - with cls(scheme, self.log.getChild(scheme), opts, self) as self.interface: - if opts: - raise ConfigError(self.unknown_options(cls, opts)) - self.log.info('startup done, handling transport messages') - if systemd: - systemd.daemon.notify("READY=1\nSTATUS=accepting requests") - t = mkthread(self.interface.serve_forever) - # we wait here on the thread finishing, which means we got a - # signal to shut down or an exception was raised - # TODO: get the exception (and re-raise?) + self.interfaces = [] + iface_threads = [] + interfaces_started = MultiEvent(default_timeout=1)#default_timeout=15) + lock = threading.Lock() + for interface in [self.node_cfg['interface']] + self.node_cfg.get('interfaces', []): + opts = {'uri': interface} + t = mkthread( + self._interfaceThread, + opts, + lock, + self.interfaces.append, + interfaces_started.get_trigger(), + ) + iface_threads.append(t) + interfaces_started.wait() + + self.log.info('startup done, handling transport messages') + if systemd: + systemd.daemon.notify("READY=1\nSTATUS=accepting requests") + + self.log.info('Started %d interfaces' % len(self.interfaces)) + # we wait here on the thread finishing, which means we got a + # signal to shut down or an exception was raised + # TODO: get the exception (and re-raise?) + for t in iface_threads: t.join() - self.interface = None # fine due to the semantics of 'with' - # server_close() called by 'with' self.log.info(f'stopped listenning, cleaning up' f' {len(self.secnode.modules)} modules') @@ -185,11 +195,28 @@ class Server: def restart(self): if not self._restart: self._restart = True - self.interface.shutdown() + for iface in self.interfaces: + iface.shutdown() def shutdown(self): self._restart = False - self.interface.shutdown() + for iface in self.interfaces: + iface.shutdown() + + def _interfaceThread(self, opts, lock, if_cb, start_cb): + scheme, _, _ = opts['uri'].rpartition('://') + iface = opts['uri'] + scheme = scheme or 'tcp' + cls = get_class(self.INTERFACES[scheme]) + with cls(scheme, self.log.getChild(scheme), opts, self) as interface: + if opts: + raise ConfigError(self.unknown_options(cls, opts)) + with lock: + if_cb(interface) + start_cb() + interface.serve_forever() + # server_close() called by 'with' + self.log.info(f'stopped {iface}') def _processCfg(self): """Processes the module configuration.