core: introduce common handler class
- make RequestHanlder based on socketserver.BaserequestHandler - split handle() into subfunctions - rework TCPRequestHandler Change-Id: I62452e21c03b9cb9937673ce9c8663765798f863 Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/32984 Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch> Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de> Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
This commit is contained in:
parent
c63f98f3cb
commit
118e22ee44
231
frappy/protocol/interface/handler.py
Normal file
231
frappy/protocol/interface/handler.py
Normal file
@ -0,0 +1,231 @@
|
|||||||
|
# *****************************************************************************
|
||||||
|
# This program is free software; you can redistribute it and/or modify it under
|
||||||
|
# the terms of the GNU General Public License as published by the Free Software
|
||||||
|
# Foundation; either version 2 of the License, or (at your option) any later
|
||||||
|
# version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||||
|
# details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License along with
|
||||||
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||||
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||||
|
#
|
||||||
|
# Module authors:
|
||||||
|
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
|
||||||
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
||||||
|
#
|
||||||
|
# *****************************************************************************
|
||||||
|
"""The common parts of the SECNodes outside interfaces"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
|
||||||
|
from frappy.errors import SECoPError
|
||||||
|
from frappy.lib import formatException, formatExtendedStack, \
|
||||||
|
formatExtendedTraceback
|
||||||
|
from frappy.protocol.messages import ERRORPREFIX, HELPREPLY, HELPREQUEST, \
|
||||||
|
HelpMessage
|
||||||
|
|
||||||
|
|
||||||
|
class DecodeError(Exception):
|
||||||
|
def __init__(self, message, raw_msg):
|
||||||
|
super().__init__(message)
|
||||||
|
self._raw_msg = raw_msg
|
||||||
|
|
||||||
|
@property
|
||||||
|
def raw_msg(self):
|
||||||
|
return self._raw_msg
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectionClose(Exception):
|
||||||
|
"""Indicates that receive quit due to an error."""
|
||||||
|
|
||||||
|
|
||||||
|
class RequestHandler:
|
||||||
|
"""Base class for the request handlers.
|
||||||
|
|
||||||
|
This is an extended copy of the BaseRequestHandler from socketserver.
|
||||||
|
|
||||||
|
To make a new interface, implement these methods:
|
||||||
|
ingest, next_message, decode_message, receive, send_reply and format
|
||||||
|
and extend (override) setup() and finish() if needed.
|
||||||
|
|
||||||
|
For an example, have a look at TCPRequestHandler.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Methods from BaseRequestHandler
|
||||||
|
def __init__(self, request, client_address, server):
|
||||||
|
self.request = request
|
||||||
|
self.client_address = client_address
|
||||||
|
self.server = server
|
||||||
|
|
||||||
|
self.setup()
|
||||||
|
try:
|
||||||
|
self.handle()
|
||||||
|
finally:
|
||||||
|
self.finish()
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
self.log = self.server.log
|
||||||
|
self.log.info("new connection %s", self.format())
|
||||||
|
# notify dispatcher of us
|
||||||
|
self.server.dispatcher.add_connection(self)
|
||||||
|
self.send_lock = threading.Lock()
|
||||||
|
self.running = True
|
||||||
|
# overwrite this with an appropriate buffer if needed
|
||||||
|
self.data = None
|
||||||
|
|
||||||
|
def handle(self):
|
||||||
|
"""handle a new connection"""
|
||||||
|
# copy state info
|
||||||
|
serverobj = self.server
|
||||||
|
# copy relevant settings from Interface
|
||||||
|
detailed_errors = serverobj.detailed_errors
|
||||||
|
|
||||||
|
# start serving
|
||||||
|
while self.running:
|
||||||
|
try:
|
||||||
|
newdata = self.receive()
|
||||||
|
if newdata is None:
|
||||||
|
# no new data during read, continue
|
||||||
|
continue
|
||||||
|
self.ingest(newdata)
|
||||||
|
except ConnectionClose:
|
||||||
|
# either normal close or error in receive
|
||||||
|
return
|
||||||
|
# put data into (de-) framer,
|
||||||
|
# de-frame data with next_message() and decode it
|
||||||
|
# call dispatcher.handle_request(self, message)
|
||||||
|
# dispatcher will queue the reply before returning
|
||||||
|
while self.running:
|
||||||
|
try:
|
||||||
|
msg = self.next_message()
|
||||||
|
if msg is None:
|
||||||
|
break # no more messages to process
|
||||||
|
except DecodeError as err:
|
||||||
|
# we have to decode 'origin' here
|
||||||
|
# use latin-1, as utf-8 or ascii may lead to encoding errors
|
||||||
|
msg = err.raw_msg.decode('latin-1').split(' ', 3) + [
|
||||||
|
None
|
||||||
|
] # make sure len(msg) > 1
|
||||||
|
result = (
|
||||||
|
ERRORPREFIX + msg[0],
|
||||||
|
msg[1],
|
||||||
|
[
|
||||||
|
'InternalError', str(err),
|
||||||
|
{
|
||||||
|
'exception': formatException(),
|
||||||
|
'traceback': formatExtendedStack()
|
||||||
|
}
|
||||||
|
]
|
||||||
|
)
|
||||||
|
print('--------------------')
|
||||||
|
print(formatException())
|
||||||
|
print('--------------------')
|
||||||
|
print(formatExtendedTraceback(sys.exc_info()))
|
||||||
|
print('====================')
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
if msg[0] == HELPREQUEST:
|
||||||
|
self.handle_help()
|
||||||
|
result = (HELPREPLY, None, None)
|
||||||
|
else:
|
||||||
|
result = serverobj.dispatcher.handle_request(self,
|
||||||
|
msg)
|
||||||
|
except SECoPError as err:
|
||||||
|
result = (
|
||||||
|
ERRORPREFIX + msg[0],
|
||||||
|
msg[1],
|
||||||
|
[
|
||||||
|
err.name,
|
||||||
|
str(err),
|
||||||
|
{
|
||||||
|
'exception': formatException(),
|
||||||
|
'traceback': formatExtendedStack()
|
||||||
|
}
|
||||||
|
]
|
||||||
|
)
|
||||||
|
except Exception as err:
|
||||||
|
# create Error Obj instead
|
||||||
|
result = (
|
||||||
|
ERRORPREFIX + msg[0],
|
||||||
|
msg[1],
|
||||||
|
[
|
||||||
|
'InternalError',
|
||||||
|
repr(err),
|
||||||
|
{
|
||||||
|
'exception': formatException(),
|
||||||
|
'traceback': formatExtendedStack()
|
||||||
|
}
|
||||||
|
]
|
||||||
|
)
|
||||||
|
print('--------------------')
|
||||||
|
print(formatException())
|
||||||
|
print('--------------------')
|
||||||
|
print(formatExtendedTraceback(sys.exc_info()))
|
||||||
|
print('====================')
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
self.log.error('empty result upon msg %s', repr(msg))
|
||||||
|
if result[0].startswith(ERRORPREFIX) and not detailed_errors:
|
||||||
|
# strip extra information
|
||||||
|
result[2][2].clear()
|
||||||
|
self.send_reply(result)
|
||||||
|
|
||||||
|
def handle_help(self):
|
||||||
|
for idx, line in enumerate(HelpMessage.splitlines()):
|
||||||
|
# not sending HELPREPLY here, as there should be only one reply for
|
||||||
|
# every request
|
||||||
|
self.send_reply(('_', f'{idx + 1}', line))
|
||||||
|
|
||||||
|
def finish(self):
|
||||||
|
"""called when handle() terminates, i.e. the socket closed"""
|
||||||
|
self.log.info('closing connection %s', self.format())
|
||||||
|
# notify dispatcher
|
||||||
|
self.server.dispatcher.remove_connection(self)
|
||||||
|
|
||||||
|
# Methods for implementing in derived classes:
|
||||||
|
def ingest(self, newdata):
|
||||||
|
"""Put the new data into the buffer."""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def next_message(self):
|
||||||
|
"""Get the next decoded message from the buffer.
|
||||||
|
|
||||||
|
Has to return a triple of (MESSAGE, specifier, data) or None, in case
|
||||||
|
there are no further messages in the receive queue.
|
||||||
|
|
||||||
|
If there is an Error during decoding, this method has to raise a
|
||||||
|
DecodeError.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def receive(self):
|
||||||
|
"""Receive data from the link.
|
||||||
|
|
||||||
|
Should return the received data or None if there was nothing new. Has
|
||||||
|
to raise a ConnectionClose on shutdown of the connection or on errors
|
||||||
|
that are not recoverable.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def send_reply(self, data):
|
||||||
|
"""send reply
|
||||||
|
|
||||||
|
stops recv loop on error
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def format(self):
|
||||||
|
"""
|
||||||
|
Format available connection data into something recognizable for
|
||||||
|
logging.
|
||||||
|
|
||||||
|
For example, the remote IP address or a connection identifier.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
# TODO: server baseclass?
|
@ -18,122 +18,77 @@
|
|||||||
# Markus Zolliker <markus.zolliker@psi.ch>
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
||||||
#
|
#
|
||||||
# *****************************************************************************
|
# *****************************************************************************
|
||||||
"""provides tcp interface to the SECoP Server"""
|
"""TCP interface to the SECoP Server"""
|
||||||
|
|
||||||
import errno
|
import errno
|
||||||
import os
|
import os
|
||||||
import socket
|
import socket
|
||||||
import socketserver
|
import socketserver
|
||||||
import sys
|
|
||||||
import threading
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from frappy.datatypes import BoolType, StringType
|
from frappy.datatypes import BoolType, StringType
|
||||||
from frappy.errors import SECoPError
|
from frappy.lib import SECoP_DEFAULT_PORT
|
||||||
from frappy.lib import formatException, formatExtendedStack, \
|
|
||||||
formatExtendedTraceback, SECoP_DEFAULT_PORT
|
|
||||||
from frappy.properties import Property
|
from frappy.properties import Property
|
||||||
from frappy.protocol.interface import decode_msg, encode_msg_frame, get_msg
|
from frappy.protocol.interface import decode_msg, encode_msg_frame, get_msg
|
||||||
from frappy.protocol.messages import ERRORPREFIX, HELPREPLY, HELPREQUEST, \
|
from frappy.protocol.interface.handler import ConnectionClose, \
|
||||||
HelpMessage
|
RequestHandler, DecodeError
|
||||||
|
from frappy.protocol.messages import HELPREQUEST
|
||||||
|
|
||||||
|
|
||||||
MESSAGE_READ_SIZE = 1024
|
MESSAGE_READ_SIZE = 1024
|
||||||
HELP = HELPREQUEST.encode()
|
|
||||||
|
|
||||||
|
|
||||||
class TCPRequestHandler(socketserver.BaseRequestHandler):
|
def format_address(addr):
|
||||||
|
if len(addr) == 2:
|
||||||
|
return '%s:%d' % addr
|
||||||
|
address, port = addr[0:2]
|
||||||
|
if address.startswith('::ffff'):
|
||||||
|
return '%s:%d' % (address[7:], port)
|
||||||
|
return '[%s]:%d' % (address, port)
|
||||||
|
|
||||||
|
|
||||||
|
class TCPRequestHandler(RequestHandler):
|
||||||
def setup(self):
|
def setup(self):
|
||||||
self.log = self.server.log
|
super().setup()
|
||||||
self.running = True
|
self.request.settimeout(1)
|
||||||
self.send_lock = threading.Lock()
|
self.data = b''
|
||||||
|
|
||||||
def handle(self):
|
def finish(self):
|
||||||
"""handle a new tcp-connection"""
|
"""called when handle() terminates, i.e. the socket closed"""
|
||||||
# copy state info
|
super().finish()
|
||||||
mysocket = self.request
|
# close socket
|
||||||
clientaddr = self.client_address
|
try:
|
||||||
serverobj = self.server
|
self.request.shutdown(socket.SHUT_RDWR)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
self.request.close()
|
||||||
|
|
||||||
self.log.info("handling new connection from %s", format_address(clientaddr))
|
def ingest(self, newdata):
|
||||||
data = b''
|
self.data += newdata
|
||||||
|
|
||||||
# notify dispatcher of us
|
def next_message(self):
|
||||||
serverobj.dispatcher.add_connection(self)
|
try:
|
||||||
|
message, self.data = get_msg(self.data)
|
||||||
|
if message is None:
|
||||||
|
return None
|
||||||
|
if message.strip() == b'':
|
||||||
|
return (HELPREQUEST, None, None)
|
||||||
|
return decode_msg(message)
|
||||||
|
except Exception as e:
|
||||||
|
raise DecodeError('exception in receive', raw_msg=message) from e
|
||||||
|
|
||||||
# copy relevant settings from Interface
|
def receive(self):
|
||||||
detailed_errors = serverobj.detailed_errors
|
try:
|
||||||
|
data = self.request.recv(MESSAGE_READ_SIZE)
|
||||||
mysocket.settimeout(1)
|
|
||||||
# start serving
|
|
||||||
while self.running:
|
|
||||||
try:
|
|
||||||
newdata = mysocket.recv(MESSAGE_READ_SIZE)
|
|
||||||
if not newdata:
|
|
||||||
# no timeout error, but no new data -> connection closed
|
|
||||||
return
|
|
||||||
data = data + newdata
|
|
||||||
except socket.timeout:
|
|
||||||
continue
|
|
||||||
except socket.error as e:
|
|
||||||
self.log.exception(e)
|
|
||||||
return
|
|
||||||
if not data:
|
if not data:
|
||||||
continue
|
raise ConnectionClose('socket was closed')
|
||||||
# put data into (de-) framer,
|
return data
|
||||||
# put frames into (de-) coder and if a message appear,
|
except socket.timeout:
|
||||||
# call dispatcher.handle_request(self, message)
|
return None
|
||||||
# dispatcher will queue the reply before returning
|
except socket.error as e:
|
||||||
while self.running:
|
self.log.exception(e)
|
||||||
origin, data = get_msg(data)
|
raise ConnectionClose() from e
|
||||||
if origin is None:
|
|
||||||
break # no more messages to process
|
|
||||||
origin = origin.strip()
|
|
||||||
if origin in (HELP, b''): # empty string -> send help message
|
|
||||||
for idx, line in enumerate(HelpMessage.splitlines()):
|
|
||||||
# not sending HELPREPLY here, as there should be only one reply for every request
|
|
||||||
self.send_reply(('_', f'{idx + 1}', line))
|
|
||||||
# ident matches request
|
|
||||||
self.send_reply((HELPREPLY, None, None))
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
msg = decode_msg(origin)
|
|
||||||
except Exception as err:
|
|
||||||
# we have to decode 'origin' here
|
|
||||||
# use latin-1, as utf-8 or ascii may lead to encoding errors
|
|
||||||
msg = origin.decode('latin-1').split(' ', 3) + [None] # make sure len(msg) > 1
|
|
||||||
result = (ERRORPREFIX + msg[0], msg[1], ['InternalError', str(err),
|
|
||||||
{'exception': formatException(),
|
|
||||||
'traceback': formatExtendedStack()}])
|
|
||||||
print('--------------------')
|
|
||||||
print(formatException())
|
|
||||||
print('--------------------')
|
|
||||||
print(formatExtendedTraceback(sys.exc_info()))
|
|
||||||
print('====================')
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
result = serverobj.dispatcher.handle_request(self, msg)
|
|
||||||
except SECoPError as err:
|
|
||||||
result = (ERRORPREFIX + msg[0], msg[1], [err.name, str(err),
|
|
||||||
{'exception': formatException(),
|
|
||||||
'traceback': formatExtendedStack()}])
|
|
||||||
except Exception as err:
|
|
||||||
# create Error Obj instead
|
|
||||||
result = (ERRORPREFIX + msg[0], msg[1], ['InternalError', repr(err),
|
|
||||||
{'exception': formatException(),
|
|
||||||
'traceback': formatExtendedStack()}])
|
|
||||||
print('--------------------')
|
|
||||||
print(formatException())
|
|
||||||
print('--------------------')
|
|
||||||
print(formatExtendedTraceback(sys.exc_info()))
|
|
||||||
print('====================')
|
|
||||||
|
|
||||||
if not result:
|
|
||||||
self.log.error('empty result upon msg %s', repr(msg))
|
|
||||||
if result[0].startswith(ERRORPREFIX) and not detailed_errors:
|
|
||||||
# strip extra information
|
|
||||||
result[2][2].clear()
|
|
||||||
self.send_reply(result)
|
|
||||||
|
|
||||||
def send_reply(self, data):
|
def send_reply(self, data):
|
||||||
"""send reply
|
"""send reply
|
||||||
@ -156,18 +111,9 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
|
|||||||
self.log.error('ERROR in send_reply %r', e)
|
self.log.error('ERROR in send_reply %r', e)
|
||||||
self.running = False
|
self.running = False
|
||||||
|
|
||||||
def finish(self):
|
def format(self):
|
||||||
"""called when handle() terminates, i.e. the socket closed"""
|
return f'from {format_address(self.client_address)}'
|
||||||
self.log.info('closing connection from %s', format_address(self.client_address))
|
|
||||||
# notify dispatcher
|
|
||||||
self.server.dispatcher.remove_connection(self)
|
|
||||||
# close socket
|
|
||||||
try:
|
|
||||||
self.request.shutdown(socket.SHUT_RDWR)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
self.request.close()
|
|
||||||
|
|
||||||
class DualStackTCPServer(socketserver.ThreadingTCPServer):
|
class DualStackTCPServer(socketserver.ThreadingTCPServer):
|
||||||
"""Subclassed to provide IPv6 capabilities as socketserver only uses IPv4"""
|
"""Subclassed to provide IPv6 capabilities as socketserver only uses IPv4"""
|
||||||
@ -230,12 +176,3 @@ class TCPServer(DualStackTCPServer):
|
|||||||
if ntry:
|
if ntry:
|
||||||
self.log.warning('tried again %d times after "Address already in use"', ntry)
|
self.log.warning('tried again %d times after "Address already in use"', ntry)
|
||||||
self.log.info("TCPServer initiated")
|
self.log.info("TCPServer initiated")
|
||||||
|
|
||||||
|
|
||||||
def format_address(addr):
|
|
||||||
if len(addr) == 2:
|
|
||||||
return '%s:%d' % addr
|
|
||||||
address, port = addr[0:2]
|
|
||||||
if address.startswith('::ffff'):
|
|
||||||
return '%s:%d' % (address[7:], port)
|
|
||||||
return '[%s]:%d' % (address, port)
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user