core: allow multiple interfaces

Change-Id: Ib8c0baef85a6dd69cddafe1c4973e42136d1588b
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/32489
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
This commit is contained in:
Alexander Zaft 2023-11-07 11:03:16 +01:00 committed by Markus Zolliker
parent 7333ccd7a6
commit 34793f8bd0

View File

@ -25,6 +25,7 @@
import os
import signal
import sys
import threading
from frappy.config import load_config
from frappy.errors import ConfigError
@ -107,8 +108,8 @@ class Server:
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, _num, _frame):
if hasattr(self, 'interface') and self.interface:
def signal_handler(self, num, frame):
if hasattr(self, 'interfaces') and self.interfaces:
self.shutdown()
def start(self):
@ -151,23 +152,32 @@ class Server:
print(formatException(verbose=True))
raise
opts = {'uri': self.node_cfg['interface']}
scheme, _, _ = opts['uri'].rpartition('://')
scheme = scheme or 'tcp'
cls = get_class(self.INTERFACES[scheme])
with cls(scheme, self.log.getChild(scheme), opts, self) as self.interface:
if opts:
raise ConfigError(self.unknown_options(cls, opts))
self.interfaces = []
iface_threads = []
interfaces_started = MultiEvent(default_timeout=1)#default_timeout=15)
lock = threading.Lock()
for interface in [self.node_cfg['interface']] + self.node_cfg.get('interfaces', []):
opts = {'uri': interface}
t = mkthread(
self._interfaceThread,
opts,
lock,
self.interfaces.append,
interfaces_started.get_trigger(),
)
iface_threads.append(t)
interfaces_started.wait()
self.log.info('startup done, handling transport messages')
if systemd:
systemd.daemon.notify("READY=1\nSTATUS=accepting requests")
t = mkthread(self.interface.serve_forever)
self.log.info('Started %d interfaces' % len(self.interfaces))
# we wait here on the thread finishing, which means we got a
# signal to shut down or an exception was raised
# TODO: get the exception (and re-raise?)
for t in iface_threads:
t.join()
self.interface = None # fine due to the semantics of 'with'
# server_close() called by 'with'
self.log.info(f'stopped listenning, cleaning up'
f' {len(self.secnode.modules)} modules')
@ -185,11 +195,28 @@ class Server:
def restart(self):
if not self._restart:
self._restart = True
self.interface.shutdown()
for iface in self.interfaces:
iface.shutdown()
def shutdown(self):
self._restart = False
self.interface.shutdown()
for iface in self.interfaces:
iface.shutdown()
def _interfaceThread(self, opts, lock, if_cb, start_cb):
scheme, _, _ = opts['uri'].rpartition('://')
iface = opts['uri']
scheme = scheme or 'tcp'
cls = get_class(self.INTERFACES[scheme])
with cls(scheme, self.log.getChild(scheme), opts, self) as interface:
if opts:
raise ConfigError(self.unknown_options(cls, opts))
with lock:
if_cb(interface)
start_cb()
interface.serve_forever()
# server_close() called by 'with'
self.log.info(f'stopped {iface}')
def _processCfg(self):
"""Processes the module configuration.