diff --git a/frappy/protocol/interface/handler.py b/frappy/protocol/interface/handler.py new file mode 100644 index 0000000..b890155 --- /dev/null +++ b/frappy/protocol/interface/handler.py @@ -0,0 +1,231 @@ +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Enrico Faulhaber +# Markus Zolliker +# +# ***************************************************************************** +"""The common parts of the SECNodes outside interfaces""" + +import sys +import threading + +from frappy.errors import SECoPError +from frappy.lib import formatException, formatExtendedStack, \ + formatExtendedTraceback +from frappy.protocol.messages import ERRORPREFIX, HELPREPLY, HELPREQUEST, \ + HelpMessage + + +class DecodeError(Exception): + def __init__(self, message, raw_msg): + super().__init__(message) + self._raw_msg = raw_msg + + @property + def raw_msg(self): + return self._raw_msg + + +class ConnectionClose(Exception): + """Indicates that receive quit due to an error.""" + + +class RequestHandler: + """Base class for the request handlers. + + This is an extended copy of the BaseRequestHandler from socketserver. + + To make a new interface, implement these methods: + ingest, next_message, decode_message, receive, send_reply and format + and extend (override) setup() and finish() if needed. + + For an example, have a look at TCPRequestHandler. + """ + + # Methods from BaseRequestHandler + def __init__(self, request, client_address, server): + self.request = request + self.client_address = client_address + self.server = server + + self.setup() + try: + self.handle() + finally: + self.finish() + + def setup(self): + self.log = self.server.log + self.log.info("new connection %s", self.format()) + # notify dispatcher of us + self.server.dispatcher.add_connection(self) + self.send_lock = threading.Lock() + self.running = True + # overwrite this with an appropriate buffer if needed + self.data = None + + def handle(self): + """handle a new connection""" + # copy state info + serverobj = self.server + # copy relevant settings from Interface + detailed_errors = serverobj.detailed_errors + + # start serving + while self.running: + try: + newdata = self.receive() + if newdata is None: + # no new data during read, continue + continue + self.ingest(newdata) + except ConnectionClose: + # either normal close or error in receive + return + # put data into (de-) framer, + # de-frame data with next_message() and decode it + # call dispatcher.handle_request(self, message) + # dispatcher will queue the reply before returning + while self.running: + try: + msg = self.next_message() + if msg is None: + break # no more messages to process + except DecodeError as err: + # we have to decode 'origin' here + # use latin-1, as utf-8 or ascii may lead to encoding errors + msg = err.raw_msg.decode('latin-1').split(' ', 3) + [ + None + ] # make sure len(msg) > 1 + result = ( + ERRORPREFIX + msg[0], + msg[1], + [ + 'InternalError', str(err), + { + 'exception': formatException(), + 'traceback': formatExtendedStack() + } + ] + ) + print('--------------------') + print(formatException()) + print('--------------------') + print(formatExtendedTraceback(sys.exc_info())) + print('====================') + else: + try: + if msg[0] == HELPREQUEST: + self.handle_help() + result = (HELPREPLY, None, None) + else: + result = serverobj.dispatcher.handle_request(self, + msg) + except SECoPError as err: + result = ( + ERRORPREFIX + msg[0], + msg[1], + [ + err.name, + str(err), + { + 'exception': formatException(), + 'traceback': formatExtendedStack() + } + ] + ) + except Exception as err: + # create Error Obj instead + result = ( + ERRORPREFIX + msg[0], + msg[1], + [ + 'InternalError', + repr(err), + { + 'exception': formatException(), + 'traceback': formatExtendedStack() + } + ] + ) + print('--------------------') + print(formatException()) + print('--------------------') + print(formatExtendedTraceback(sys.exc_info())) + print('====================') + + if not result: + self.log.error('empty result upon msg %s', repr(msg)) + if result[0].startswith(ERRORPREFIX) and not detailed_errors: + # strip extra information + result[2][2].clear() + self.send_reply(result) + + def handle_help(self): + for idx, line in enumerate(HelpMessage.splitlines()): + # not sending HELPREPLY here, as there should be only one reply for + # every request + self.send_reply(('_', f'{idx + 1}', line)) + + def finish(self): + """called when handle() terminates, i.e. the socket closed""" + self.log.info('closing connection %s', self.format()) + # notify dispatcher + self.server.dispatcher.remove_connection(self) + + # Methods for implementing in derived classes: + def ingest(self, newdata): + """Put the new data into the buffer.""" + raise NotImplementedError + + def next_message(self): + """Get the next decoded message from the buffer. + + Has to return a triple of (MESSAGE, specifier, data) or None, in case + there are no further messages in the receive queue. + + If there is an Error during decoding, this method has to raise a + DecodeError. + """ + raise NotImplementedError + + def receive(self): + """Receive data from the link. + + Should return the received data or None if there was nothing new. Has + to raise a ConnectionClose on shutdown of the connection or on errors + that are not recoverable. + """ + raise NotImplementedError + + def send_reply(self, data): + """send reply + + stops recv loop on error + """ + raise NotImplementedError + + def format(self): + """ + Format available connection data into something recognizable for + logging. + + For example, the remote IP address or a connection identifier. + """ + raise NotImplementedError + +# TODO: server baseclass? diff --git a/frappy/protocol/interface/tcp.py b/frappy/protocol/interface/tcp.py index e957837..844b999 100644 --- a/frappy/protocol/interface/tcp.py +++ b/frappy/protocol/interface/tcp.py @@ -18,122 +18,77 @@ # Markus Zolliker # # ***************************************************************************** -"""provides tcp interface to the SECoP Server""" +"""TCP interface to the SECoP Server""" import errno import os import socket import socketserver -import sys -import threading import time from frappy.datatypes import BoolType, StringType -from frappy.errors import SECoPError -from frappy.lib import formatException, formatExtendedStack, \ - formatExtendedTraceback, SECoP_DEFAULT_PORT +from frappy.lib import SECoP_DEFAULT_PORT from frappy.properties import Property from frappy.protocol.interface import decode_msg, encode_msg_frame, get_msg -from frappy.protocol.messages import ERRORPREFIX, HELPREPLY, HELPREQUEST, \ - HelpMessage +from frappy.protocol.interface.handler import ConnectionClose, \ + RequestHandler, DecodeError +from frappy.protocol.messages import HELPREQUEST + MESSAGE_READ_SIZE = 1024 -HELP = HELPREQUEST.encode() -class TCPRequestHandler(socketserver.BaseRequestHandler): +def format_address(addr): + if len(addr) == 2: + return '%s:%d' % addr + address, port = addr[0:2] + if address.startswith('::ffff'): + return '%s:%d' % (address[7:], port) + return '[%s]:%d' % (address, port) + +class TCPRequestHandler(RequestHandler): def setup(self): - self.log = self.server.log - self.running = True - self.send_lock = threading.Lock() + super().setup() + self.request.settimeout(1) + self.data = b'' - def handle(self): - """handle a new tcp-connection""" - # copy state info - mysocket = self.request - clientaddr = self.client_address - serverobj = self.server + def finish(self): + """called when handle() terminates, i.e. the socket closed""" + super().finish() + # close socket + try: + self.request.shutdown(socket.SHUT_RDWR) + except Exception: + pass + finally: + self.request.close() - self.log.info("handling new connection from %s", format_address(clientaddr)) - data = b'' + def ingest(self, newdata): + self.data += newdata - # notify dispatcher of us - serverobj.dispatcher.add_connection(self) + def next_message(self): + try: + message, self.data = get_msg(self.data) + if message is None: + return None + if message.strip() == b'': + return (HELPREQUEST, None, None) + return decode_msg(message) + except Exception as e: + raise DecodeError('exception in receive', raw_msg=message) from e - # copy relevant settings from Interface - detailed_errors = serverobj.detailed_errors - - mysocket.settimeout(1) - # start serving - while self.running: - try: - newdata = mysocket.recv(MESSAGE_READ_SIZE) - if not newdata: - # no timeout error, but no new data -> connection closed - return - data = data + newdata - except socket.timeout: - continue - except socket.error as e: - self.log.exception(e) - return + def receive(self): + try: + data = self.request.recv(MESSAGE_READ_SIZE) if not data: - continue - # put data into (de-) framer, - # put frames into (de-) coder and if a message appear, - # call dispatcher.handle_request(self, message) - # dispatcher will queue the reply before returning - while self.running: - origin, data = get_msg(data) - if origin is None: - break # no more messages to process - origin = origin.strip() - if origin in (HELP, b''): # empty string -> send help message - for idx, line in enumerate(HelpMessage.splitlines()): - # not sending HELPREPLY here, as there should be only one reply for every request - self.send_reply(('_', f'{idx + 1}', line)) - # ident matches request - self.send_reply((HELPREPLY, None, None)) - continue - try: - msg = decode_msg(origin) - except Exception as err: - # we have to decode 'origin' here - # use latin-1, as utf-8 or ascii may lead to encoding errors - msg = origin.decode('latin-1').split(' ', 3) + [None] # make sure len(msg) > 1 - result = (ERRORPREFIX + msg[0], msg[1], ['InternalError', str(err), - {'exception': formatException(), - 'traceback': formatExtendedStack()}]) - print('--------------------') - print(formatException()) - print('--------------------') - print(formatExtendedTraceback(sys.exc_info())) - print('====================') - else: - try: - result = serverobj.dispatcher.handle_request(self, msg) - except SECoPError as err: - result = (ERRORPREFIX + msg[0], msg[1], [err.name, str(err), - {'exception': formatException(), - 'traceback': formatExtendedStack()}]) - except Exception as err: - # create Error Obj instead - result = (ERRORPREFIX + msg[0], msg[1], ['InternalError', repr(err), - {'exception': formatException(), - 'traceback': formatExtendedStack()}]) - print('--------------------') - print(formatException()) - print('--------------------') - print(formatExtendedTraceback(sys.exc_info())) - print('====================') - - if not result: - self.log.error('empty result upon msg %s', repr(msg)) - if result[0].startswith(ERRORPREFIX) and not detailed_errors: - # strip extra information - result[2][2].clear() - self.send_reply(result) + raise ConnectionClose('socket was closed') + return data + except socket.timeout: + return None + except socket.error as e: + self.log.exception(e) + raise ConnectionClose() from e def send_reply(self, data): """send reply @@ -156,18 +111,9 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): self.log.error('ERROR in send_reply %r', e) self.running = False - def finish(self): - """called when handle() terminates, i.e. the socket closed""" - self.log.info('closing connection from %s', format_address(self.client_address)) - # notify dispatcher - self.server.dispatcher.remove_connection(self) - # close socket - try: - self.request.shutdown(socket.SHUT_RDWR) - except Exception: - pass - finally: - self.request.close() + def format(self): + return f'from {format_address(self.client_address)}' + class DualStackTCPServer(socketserver.ThreadingTCPServer): """Subclassed to provide IPv6 capabilities as socketserver only uses IPv4""" @@ -230,12 +176,3 @@ class TCPServer(DualStackTCPServer): if ntry: self.log.warning('tried again %d times after "Address already in use"', ntry) self.log.info("TCPServer initiated") - - -def format_address(addr): - if len(addr) == 2: - return '%s:%d' % addr - address, port = addr[0:2] - if address.startswith('::ffff'): - return '%s:%d' % (address[7:], port) - return '[%s]:%d' % (address, port)