diff --git a/secop/lib/asynconn.py b/secop/lib/asynconn.py index 3c45718..2bcb1ff 100644 --- a/secop/lib/asynconn.py +++ b/secop/lib/asynconn.py @@ -149,6 +149,7 @@ class AsynTcp(AsynConn): def send(self, data): """send data (bytes!)""" + # remark: will raise socket.timeout when output buffer is full and blocked for 1 sec self.connection.sendall(data) def flush_recv(self): diff --git a/secop/protocol/dispatcher.py b/secop/protocol/dispatcher.py index 1cff14d..c1577db 100644 --- a/secop/protocol/dispatcher.py +++ b/secop/protocol/dispatcher.py @@ -25,8 +25,8 @@ Interface to the service offering part: - 'handle_request(connectionobj, data)' handles incoming request - it returns the (sync) reply, and it may call 'queue_async_reply(data)' - on the connectionobj + it returns the (sync) reply, and it may call 'send_reply(data)' + on the connectionobj or on activated connections - 'add_connection(connectionobj)' registers new connection - 'remove_connection(connectionobj)' removes now longer functional connection @@ -98,7 +98,7 @@ class Dispatcher: # all generic subscribers listeners.update(self._active_connections) for conn in listeners: - conn.queue_async_reply(msg) + conn.send_reply(msg) def announce_update(self, modulename, pname, pobj): """called by modules param setters to notify subscribers of new values @@ -275,7 +275,8 @@ class Dispatcher: def handle_request(self, conn, msg): """handles incoming request - will call 'queue_async_reply(data)' on conn or return reply + will return return reply, may send replies to conn or + activated connections in addition """ self.log.debug('Dispatcher: handling msg: %s' % repr(msg)) @@ -360,11 +361,11 @@ class Dispatcher: for modulename, pname in modules: moduleobj = self._modules.get(modulename, None) if pname: - conn.queue_async_reply(make_update(modulename, moduleobj.parameters[pname])) + conn.send_reply(make_update(modulename, moduleobj.parameters[pname])) continue for pobj in moduleobj.accessibles.values(): if isinstance(pobj, Parameter) and pobj.export: - conn.queue_async_reply(make_update(modulename, pobj)) + conn.send_reply(make_update(modulename, pobj)) return (ENABLEEVENTSREPLY, specifier, None) if specifier else (ENABLEEVENTSREPLY, None, None) def handle_deactivate(self, conn, specifier, data): diff --git a/secop/protocol/interface/tcp.py b/secop/protocol/interface/tcp.py index d4f6267..58ccc33 100644 --- a/secop/protocol/interface/tcp.py +++ b/secop/protocol/interface/tcp.py @@ -23,8 +23,8 @@ import sys import socket -import collections import socketserver +import threading from secop.datatypes import StringType, BoolType from secop.errors import SECoPError @@ -40,18 +40,13 @@ DEF_PORT = 10767 MESSAGE_READ_SIZE = 1024 HELP = HELPREQUEST.encode() -class OutputBufferOverflow(Exception): - pass - class TCPRequestHandler(socketserver.BaseRequestHandler): def setup(self): self.log = self.server.log - # Queue of msgObjects to send - self._queue = collections.deque() # do not use maxlen, as items might get lost -# self.framing = self.server.framingCLS() -# self.encoding = self.server.encodingCLS() + self.running = True + self.send_lock = threading.Lock() def handle(self): """handle a new tcp-connection""" @@ -59,6 +54,7 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): mysocket = self.request clientaddr = self.client_address serverobj = self.server + self.log.info("handling new connection from %s:%d" % clientaddr) data = b'' @@ -68,28 +64,9 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): # copy relevant settings from Interface detailed_errors = serverobj.detailed_errors - mysocket.settimeout(.3) - # mysocket.setblocking(False) + mysocket.settimeout(1) # start serving - while True: - # send replys first, then listen for requests, timing out after 0.1s - while self._queue: - # put message into encoder to get frame(s) - # put frame(s) into framer to get bytestring - # send bytestring - outmsg = self._queue.popleft() - if not outmsg: - outmsg = ('error', 'InternalError', ['', 'trying to send none-data', {}]) - if len(outmsg) > 3: - outmsg = ('error', 'InternalError', ['', 'bad message format', {'msg': outmsg}]) - outdata = encode_msg_frame(*outmsg) - try: - mysocket.sendall(outdata) - except Exception as e: - self.log.error('error on sendall: %r', e) - return - - # XXX: improve: use polling/select here? + while self.running: try: newdata = mysocket.recv(MESSAGE_READ_SIZE) if not newdata: @@ -101,14 +78,13 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): except socket.error as e: self.log.exception(e) return - # XXX: should use select instead of busy polling if not data: continue # put data into (de-) framer, # put frames into (de-) coder and if a message appear, # call dispatcher.handle_request(self, message) # dispatcher will queue the reply before returning - while True: + while self.running: origin, data = get_msg(data) if origin is None: break # no more messages to process @@ -116,9 +92,9 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): if origin in (HELP, b''): # empty string -> send help message for idx, line in enumerate(HelpMessage.splitlines()): # not sending HELPREPLY here, as there should be only one reply for every request - self.queue_async_reply(('_', '%d' % (idx+1), line)) + self.send_reply(('_', '%d' % (idx+1), line)) # ident matches request - self.queue_async_reply((HELPREPLY, None, None)) + self.send_reply((HELPREPLY, None, None)) continue try: msg = decode_msg(origin) @@ -137,8 +113,6 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): else: try: result = serverobj.dispatcher.handle_request(self, msg) - except OutputBufferOverflow: - raise except SECoPError as err: result = (ERRORPREFIX + msg[0], msg[1], [err.name, str(err), {'exception': formatException(), @@ -159,19 +133,24 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): if result[0].startswith(ERRORPREFIX) and not detailed_errors: # strip extra information result[2][2].clear() - self.queue_async_reply(result) + self.send_reply(result) - def queue_async_reply(self, data): - """called by dispatcher for async data units""" - if data: - # avoid queue growing to infinity. the needed size of the queue might be - # a multiple of the total number of parameters -> use a big number - if len(self._queue) > 10000: - self.log.error('output message buffer overflow') - raise OutputBufferOverflow() - self._queue.append(data) - else: - self.log.error('should async_queue empty data!') + def send_reply(self, data): + """send reply + + stops recv loop on error (including timeout when output buffer full for more than 1 sec) + """ + if not data: + self.log.error('should not reply empty data!') + return + outdata = encode_msg_frame(*data) + with self.send_lock: + if self.running: + try: + self.request.sendall(outdata) + except Exception as e: + self.log.error('ERROR in send_reply %r', e) + self.running = False def finish(self): """called when handle() terminates, i.e. the socket closed""" @@ -199,7 +178,7 @@ class TCPServer(socketserver.ThreadingTCPServer): default=False, export=False), } - def __init__(self, name, logger, options, srv): # pylint: disable=super-init-not-called + def __init__(self, name, logger, options, srv): self.dispatcher = srv.dispatcher self.name = name self.log = logger