rework tcp server

motivation: a thread creating a lot of messages like a polling loop
with very short polling frequency or a fast polling connection might
monopolize the output of message over receiving new messages.
In addition, the current design has a latency of 0.3 sec for the
output of asynchronous replies.

Anyway, the output queue is just extending the network output buffer,
which is usally big enough.

- change the name of 'queue_async_reply' to 'send_reply'.
  This method anyway was not only used for async replies.

- send_reply is directly sending the reply instead of putting into the
  queue. It will slow down the calling thread, if the output buffer
  is full, which is desired behaviour.

Change-Id: I305669be2f7c027355b43421432f32be9c166ed4
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/23119
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
2020-05-20 15:20:19 +02:00
parent 31ae0a88b4
commit f7a6ba8b5b
3 changed files with 35 additions and 54 deletions

View File

@ -149,6 +149,7 @@ class AsynTcp(AsynConn):
def send(self, data): def send(self, data):
"""send data (bytes!)""" """send data (bytes!)"""
# remark: will raise socket.timeout when output buffer is full and blocked for 1 sec
self.connection.sendall(data) self.connection.sendall(data)
def flush_recv(self): def flush_recv(self):

View File

@ -25,8 +25,8 @@
Interface to the service offering part: Interface to the service offering part:
- 'handle_request(connectionobj, data)' handles incoming request - 'handle_request(connectionobj, data)' handles incoming request
it returns the (sync) reply, and it may call 'queue_async_reply(data)' it returns the (sync) reply, and it may call 'send_reply(data)'
on the connectionobj on the connectionobj or on activated connections
- 'add_connection(connectionobj)' registers new connection - 'add_connection(connectionobj)' registers new connection
- 'remove_connection(connectionobj)' removes now longer functional connection - 'remove_connection(connectionobj)' removes now longer functional connection
@ -98,7 +98,7 @@ class Dispatcher:
# all generic subscribers # all generic subscribers
listeners.update(self._active_connections) listeners.update(self._active_connections)
for conn in listeners: for conn in listeners:
conn.queue_async_reply(msg) conn.send_reply(msg)
def announce_update(self, modulename, pname, pobj): def announce_update(self, modulename, pname, pobj):
"""called by modules param setters to notify subscribers of new values """called by modules param setters to notify subscribers of new values
@ -275,7 +275,8 @@ class Dispatcher:
def handle_request(self, conn, msg): def handle_request(self, conn, msg):
"""handles incoming request """handles incoming request
will call 'queue_async_reply(data)' on conn or return reply will return return reply, may send replies to conn or
activated connections in addition
""" """
self.log.debug('Dispatcher: handling msg: %s' % repr(msg)) self.log.debug('Dispatcher: handling msg: %s' % repr(msg))
@ -360,11 +361,11 @@ class Dispatcher:
for modulename, pname in modules: for modulename, pname in modules:
moduleobj = self._modules.get(modulename, None) moduleobj = self._modules.get(modulename, None)
if pname: if pname:
conn.queue_async_reply(make_update(modulename, moduleobj.parameters[pname])) conn.send_reply(make_update(modulename, moduleobj.parameters[pname]))
continue continue
for pobj in moduleobj.accessibles.values(): for pobj in moduleobj.accessibles.values():
if isinstance(pobj, Parameter) and pobj.export: if isinstance(pobj, Parameter) and pobj.export:
conn.queue_async_reply(make_update(modulename, pobj)) conn.send_reply(make_update(modulename, pobj))
return (ENABLEEVENTSREPLY, specifier, None) if specifier else (ENABLEEVENTSREPLY, None, None) return (ENABLEEVENTSREPLY, specifier, None) if specifier else (ENABLEEVENTSREPLY, None, None)
def handle_deactivate(self, conn, specifier, data): def handle_deactivate(self, conn, specifier, data):

View File

@ -23,8 +23,8 @@
import sys import sys
import socket import socket
import collections
import socketserver import socketserver
import threading
from secop.datatypes import StringType, BoolType from secop.datatypes import StringType, BoolType
from secop.errors import SECoPError from secop.errors import SECoPError
@ -40,18 +40,13 @@ DEF_PORT = 10767
MESSAGE_READ_SIZE = 1024 MESSAGE_READ_SIZE = 1024
HELP = HELPREQUEST.encode() HELP = HELPREQUEST.encode()
class OutputBufferOverflow(Exception):
pass
class TCPRequestHandler(socketserver.BaseRequestHandler): class TCPRequestHandler(socketserver.BaseRequestHandler):
def setup(self): def setup(self):
self.log = self.server.log self.log = self.server.log
# Queue of msgObjects to send self.running = True
self._queue = collections.deque() # do not use maxlen, as items might get lost self.send_lock = threading.Lock()
# self.framing = self.server.framingCLS()
# self.encoding = self.server.encodingCLS()
def handle(self): def handle(self):
"""handle a new tcp-connection""" """handle a new tcp-connection"""
@ -59,6 +54,7 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
mysocket = self.request mysocket = self.request
clientaddr = self.client_address clientaddr = self.client_address
serverobj = self.server serverobj = self.server
self.log.info("handling new connection from %s:%d" % clientaddr) self.log.info("handling new connection from %s:%d" % clientaddr)
data = b'' data = b''
@ -68,28 +64,9 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
# copy relevant settings from Interface # copy relevant settings from Interface
detailed_errors = serverobj.detailed_errors detailed_errors = serverobj.detailed_errors
mysocket.settimeout(.3) mysocket.settimeout(1)
# mysocket.setblocking(False)
# start serving # start serving
while True: while self.running:
# send replys first, then listen for requests, timing out after 0.1s
while self._queue:
# put message into encoder to get frame(s)
# put frame(s) into framer to get bytestring
# send bytestring
outmsg = self._queue.popleft()
if not outmsg:
outmsg = ('error', 'InternalError', ['<unknown origin>', 'trying to send none-data', {}])
if len(outmsg) > 3:
outmsg = ('error', 'InternalError', ['<unknown origin>', 'bad message format', {'msg': outmsg}])
outdata = encode_msg_frame(*outmsg)
try:
mysocket.sendall(outdata)
except Exception as e:
self.log.error('error on sendall: %r', e)
return
# XXX: improve: use polling/select here?
try: try:
newdata = mysocket.recv(MESSAGE_READ_SIZE) newdata = mysocket.recv(MESSAGE_READ_SIZE)
if not newdata: if not newdata:
@ -101,14 +78,13 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
except socket.error as e: except socket.error as e:
self.log.exception(e) self.log.exception(e)
return return
# XXX: should use select instead of busy polling
if not data: if not data:
continue continue
# put data into (de-) framer, # put data into (de-) framer,
# put frames into (de-) coder and if a message appear, # put frames into (de-) coder and if a message appear,
# call dispatcher.handle_request(self, message) # call dispatcher.handle_request(self, message)
# dispatcher will queue the reply before returning # dispatcher will queue the reply before returning
while True: while self.running:
origin, data = get_msg(data) origin, data = get_msg(data)
if origin is None: if origin is None:
break # no more messages to process break # no more messages to process
@ -116,9 +92,9 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
if origin in (HELP, b''): # empty string -> send help message if origin in (HELP, b''): # empty string -> send help message
for idx, line in enumerate(HelpMessage.splitlines()): for idx, line in enumerate(HelpMessage.splitlines()):
# not sending HELPREPLY here, as there should be only one reply for every request # not sending HELPREPLY here, as there should be only one reply for every request
self.queue_async_reply(('_', '%d' % (idx+1), line)) self.send_reply(('_', '%d' % (idx+1), line))
# ident matches request # ident matches request
self.queue_async_reply((HELPREPLY, None, None)) self.send_reply((HELPREPLY, None, None))
continue continue
try: try:
msg = decode_msg(origin) msg = decode_msg(origin)
@ -137,8 +113,6 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
else: else:
try: try:
result = serverobj.dispatcher.handle_request(self, msg) result = serverobj.dispatcher.handle_request(self, msg)
except OutputBufferOverflow:
raise
except SECoPError as err: except SECoPError as err:
result = (ERRORPREFIX + msg[0], msg[1], [err.name, str(err), result = (ERRORPREFIX + msg[0], msg[1], [err.name, str(err),
{'exception': formatException(), {'exception': formatException(),
@ -159,19 +133,24 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
if result[0].startswith(ERRORPREFIX) and not detailed_errors: if result[0].startswith(ERRORPREFIX) and not detailed_errors:
# strip extra information # strip extra information
result[2][2].clear() result[2][2].clear()
self.queue_async_reply(result) self.send_reply(result)
def queue_async_reply(self, data): def send_reply(self, data):
"""called by dispatcher for async data units""" """send reply
if data:
# avoid queue growing to infinity. the needed size of the queue might be stops recv loop on error (including timeout when output buffer full for more than 1 sec)
# a multiple of the total number of parameters -> use a big number """
if len(self._queue) > 10000: if not data:
self.log.error('output message buffer overflow') self.log.error('should not reply empty data!')
raise OutputBufferOverflow() return
self._queue.append(data) outdata = encode_msg_frame(*data)
else: with self.send_lock:
self.log.error('should async_queue empty data!') if self.running:
try:
self.request.sendall(outdata)
except Exception as e:
self.log.error('ERROR in send_reply %r', e)
self.running = False
def finish(self): def finish(self):
"""called when handle() terminates, i.e. the socket closed""" """called when handle() terminates, i.e. the socket closed"""
@ -199,7 +178,7 @@ class TCPServer(socketserver.ThreadingTCPServer):
default=False, export=False), default=False, export=False),
} }
def __init__(self, name, logger, options, srv): # pylint: disable=super-init-not-called def __init__(self, name, logger, options, srv):
self.dispatcher = srv.dispatcher self.dispatcher = srv.dispatcher
self.name = name self.name = name
self.log = logger self.log = logger