From 6514a1b2eeae8a674a21aebe7793c7be5655c3f8 Mon Sep 17 00:00:00 2001 From: Alexander Zaft Date: Thu, 7 Mar 2024 13:05:32 +0100 Subject: [PATCH] core: add websocket interface Change-Id: Ic62abeef6fb73f4a1b3d29f9225ba164de9e3e93 Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33240 Tested-by: Jenkins Automated Tests Reviewed-by: Enrico Faulhaber Reviewed-by: Alexander Zaft --- frappy/protocol/interface/ws.py | 160 ++++++++++++++++++++++++++++++++ frappy/server.py | 1 + requirements.txt | 2 + 3 files changed, 163 insertions(+) create mode 100644 frappy/protocol/interface/ws.py diff --git a/frappy/protocol/interface/ws.py b/frappy/protocol/interface/ws.py new file mode 100644 index 0000000..64059d8 --- /dev/null +++ b/frappy/protocol/interface/ws.py @@ -0,0 +1,160 @@ +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Alexander Zaft +# +# ***************************************************************************** + +import json +from functools import partial + +from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError +from websockets.sync.server import CloseCode, serve + +from frappy.protocol.interface.handler import ConnectionClose, \ + RequestHandler, DecodeError +from frappy.protocol.messages import HELPREQUEST + + +def encode_msg_frame_str(action, specifier=None, data=None): + """ encode a msg_triple into an msg_frame, ready to be sent + + action (and optional specifier) are str strings, + data may be an json-yfied python object""" + msg = (action, specifier or '', '' if data is None else json.dumps(data)) + return ' '.join(msg).strip() + + +class WSRequestHandler(RequestHandler): + """Handles a Websocket connection.""" + + def __init__(self, conn, server): + self.conn = conn + client_address = conn.remote_address + request = conn.socket + super().__init__(request, client_address, server) + + def setup(self): + super().setup() + self.server.connections.add(self) + + def finish(self): + """called when handle() terminates, i.e. the socket closed""" + super().finish() + self.server.connections.discard(self) + # this will be called for a second time if the server is shutting down, + # but in that case it will be a no-op + self.conn.close() + + def ingest(self, newdata): + # recv on the websocket connection returns one message, we don't save + # anything in data + self.data = newdata + + def next_message(self): + """split the string into a message triple.""" + if self.data is None: + return None + try: + message = self.data.strip() + if message == '': + return HELPREQUEST, None, None + res = message.split(' ', 2) + ['', ''] + action, specifier, data = res[0:3] + self.data = None + return ( + action, + specifier or None, + None if data == '' else json.loads(data) + ) + except Exception as e: + raise DecodeError('exception when reading in message', + raw_msg=bytes(message, 'utf-8')) from e + + def receive(self): + """receives one message from the websocket.""" + try: + return self.conn.recv() + except TimeoutError: + return None + except ConnectionClosedOK: + raise ConnectionClose from None + except ConnectionClosedError as e: + self.log.error('No close frame received from %s', self.format()) + raise ConnectionClose from e + except OSError as e: + self.log.exception(e) + raise ConnectionClose from e + + def send_reply(self, data): + """send reply + + stops recv loop on error (including timeout when output buffer full for + more than 1 sec) + """ + if not data: + self.log.error('should not reply empty data!') + return + outdata = encode_msg_frame_str(*data) + with self.send_lock: + if self.running: + try: + self.conn.send(outdata) + except (BrokenPipeError, IOError) as e: + self.log.debug('send_reply got an %r, connection closed?', + e) + self.running = False + except Exception as e: + self.log.error('ERROR in send_reply %r', e) + self.running = False + + def format(self): + return f'{self.conn.id} from {self.client_address}' + +class WSServer: + """Server for providing a websocket interface. + + Implementation note: + The websockets library doesn't provide an option to subclass its server, so + we take the returned value as an attribute and provide the neccessary + function calls. + """ + def __init__(self, name, logger, options, srv): + self.connections = set() # keep track for shutting down + self.dispatcher = srv.dispatcher + self.name = name + self.log = logger + self.port = int(options.pop('uri').split('://', 1)[-1]) + self.detailed_errors = options.pop('detailed_errors', False) + + handle = partial(WSRequestHandler, server=self) + # websockets only gives the serve method without an option to subclass + self.ws_server = serve(handle, '', self.port, logger=logger) + self.log.info("Websocket server %s binding to port %d", name, self.port) + + def serve_forever(self): + self.ws_server.serve_forever() + + def shutdown(self): + for c in list(self.connections): + c.conn.close(code=CloseCode.GOING_AWAY, reason='shutting down') + self.ws_server.shutdown() + + def __enter__(self): + return self + + def __exit__(self, *args): + return self.shutdown() diff --git a/frappy/server.py b/frappy/server.py index fb14127..2bf84f5 100644 --- a/frappy/server.py +++ b/frappy/server.py @@ -53,6 +53,7 @@ except ImportError: class Server: INTERFACES = { 'tcp': 'protocol.interface.tcp.TCPServer', + 'ws': 'protocol.interface.ws.WSServer', } _restart = True diff --git a/requirements.txt b/requirements.txt index 3bf325a..3991c95 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,8 @@ mlzlog >=0.2.0 # daemonizing psutil python-daemon >=2.0 +# websocket interface: +websockets>=11.0 # for zmq interface #pyzmq>=13.1.0 #for ppms on windows