diff --git a/secop/protocol/dispatcher.py b/secop/protocol/dispatcher.py index acfe279..4371728 100644 --- a/secop/protocol/dispatcher.py +++ b/secop/protocol/dispatcher.py @@ -24,10 +24,10 @@ Interface to the service offering part: - 'handle_request(connectionobj, data)' handles incoming request - will call 'queue_request(data)' on connectionobj before returning + it returns the (sync) reply, and it may call 'queue_async_reply(data)' + on the connectionobj - 'add_connection(connectionobj)' registers new connection - 'remove_connection(connectionobj)' removes now longer functional connection - - may at any time call 'queue_async_request(connobj, data)' on the connobj Interface to the modules: - add_module(modulename, moduleobj, export=True) registers a new module under the @@ -286,7 +286,7 @@ class Dispatcher(object): def handle_request(self, conn, msg): """handles incoming request - will call 'queue_async_request(data)' on conn or return reply + will call 'queue_async_reply(data)' on conn or return reply """ self.log.debug(u'Dispatcher: handling msg: %s' % repr(msg)) diff --git a/secop/protocol/interface/__init__.py b/secop/protocol/interface/__init__.py index 85a0fb0..067ef46 100644 --- a/secop/protocol/interface/__init__.py +++ b/secop/protocol/interface/__init__.py @@ -32,8 +32,10 @@ def encode_msg_frame(action, specifier=None, data=None): data may be an json-yfied python object""" action = action.encode('utf-8') if specifier is None: - # implicit: data is None - return b''.join((action, EOL)) + if data is None: + return b''.join((action, EOL)) + # error_activate might have no specifier + specifier = '' specifier = specifier.encode('utf-8') if data: data = json.dumps(data).encode('utf-8') diff --git a/secop/protocol/interface/tcp.py b/secop/protocol/interface/tcp.py index 6a95f41..a32d684 100644 --- a/secop/protocol/interface/tcp.py +++ b/secop/protocol/interface/tcp.py @@ -49,14 +49,15 @@ CR = b'\r' SPACE = b' ' - +class OutputBufferOverflow(Exception): + pass class TCPRequestHandler(socketserver.BaseRequestHandler): def setup(self): self.log = self.server.log # Queue of msgObjects to send - self._queue = collections.deque(maxlen=100) + self._queue = collections.deque() # do not use maxlen, as items might get lost # self.framing = self.server.framingCLS() # self.encoding = self.server.encodingCLS() @@ -92,7 +93,8 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): outdata = encode_msg_frame(*outmsg) try: mysocket.sendall(outdata) - except Exception: + except Exception as e: + self.log.error('error on sendall: %r', e) return # XXX: improve: use polling/select here? @@ -139,6 +141,8 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): else: try: result = serverobj.dispatcher.handle_request(self, msg) + except OutputBufferOverflow: + raise except SECoPError as err: result = (ERRORPREFIX + msg[0], msg[1], [err.name, str(err), {'exception': formatException(), @@ -164,6 +168,11 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): def queue_async_reply(self, data): """called by dispatcher for async data units""" if data: + # avoid queue growing to infinity. the needed size of the queue might be + # a multiple of the total number of parameters -> use a big number + if len(self._queue) > 10000: + self.log.error('output message buffer overflow') + raise OutputBufferOverflow() self._queue.append(data) else: self.log.error('should async_queue empty data!')