avoid lost messages after activate message
messages might get lost after activate, when the number of parameters exceeds maxlen of the output queue, as a limited deque silently overwrites messages. first try: use an infinite deque, but slow down appending thread when deque gets too big -> does not work because the appending thread is the same second try: increase maximum a lot, on overflow log an error message and close connection Change-Id: I20376f7e08240dabe43269fa63c596f07e59ddf6 Reviewed-on: https://forge.frm2.tum.de/review/20982 Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de> Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
parent
1e6fa76717
commit
cd1b4cf4e2
@ -24,10 +24,10 @@
|
||||
Interface to the service offering part:
|
||||
|
||||
- 'handle_request(connectionobj, data)' handles incoming request
|
||||
will call 'queue_request(data)' on connectionobj before returning
|
||||
it returns the (sync) reply, and it may call 'queue_async_reply(data)'
|
||||
on the connectionobj
|
||||
- 'add_connection(connectionobj)' registers new connection
|
||||
- 'remove_connection(connectionobj)' removes now longer functional connection
|
||||
- may at any time call 'queue_async_request(connobj, data)' on the connobj
|
||||
|
||||
Interface to the modules:
|
||||
- add_module(modulename, moduleobj, export=True) registers a new module under the
|
||||
@ -286,7 +286,7 @@ class Dispatcher(object):
|
||||
def handle_request(self, conn, msg):
|
||||
"""handles incoming request
|
||||
|
||||
will call 'queue_async_request(data)' on conn or return reply
|
||||
will call 'queue_async_reply(data)' on conn or return reply
|
||||
"""
|
||||
self.log.debug(u'Dispatcher: handling msg: %s' % repr(msg))
|
||||
|
||||
|
@ -32,8 +32,10 @@ def encode_msg_frame(action, specifier=None, data=None):
|
||||
data may be an json-yfied python object"""
|
||||
action = action.encode('utf-8')
|
||||
if specifier is None:
|
||||
# implicit: data is None
|
||||
return b''.join((action, EOL))
|
||||
if data is None:
|
||||
return b''.join((action, EOL))
|
||||
# error_activate might have no specifier
|
||||
specifier = ''
|
||||
specifier = specifier.encode('utf-8')
|
||||
if data:
|
||||
data = json.dumps(data).encode('utf-8')
|
||||
|
@ -49,14 +49,15 @@ CR = b'\r'
|
||||
SPACE = b' '
|
||||
|
||||
|
||||
|
||||
class OutputBufferOverflow(Exception):
|
||||
pass
|
||||
|
||||
class TCPRequestHandler(socketserver.BaseRequestHandler):
|
||||
|
||||
def setup(self):
|
||||
self.log = self.server.log
|
||||
# Queue of msgObjects to send
|
||||
self._queue = collections.deque(maxlen=100)
|
||||
self._queue = collections.deque() # do not use maxlen, as items might get lost
|
||||
# self.framing = self.server.framingCLS()
|
||||
# self.encoding = self.server.encodingCLS()
|
||||
|
||||
@ -92,7 +93,8 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
|
||||
outdata = encode_msg_frame(*outmsg)
|
||||
try:
|
||||
mysocket.sendall(outdata)
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
self.log.error('error on sendall: %r', e)
|
||||
return
|
||||
|
||||
# XXX: improve: use polling/select here?
|
||||
@ -139,6 +141,8 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
|
||||
else:
|
||||
try:
|
||||
result = serverobj.dispatcher.handle_request(self, msg)
|
||||
except OutputBufferOverflow:
|
||||
raise
|
||||
except SECoPError as err:
|
||||
result = (ERRORPREFIX + msg[0], msg[1], [err.name, str(err),
|
||||
{'exception': formatException(),
|
||||
@ -164,6 +168,11 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
|
||||
def queue_async_reply(self, data):
|
||||
"""called by dispatcher for async data units"""
|
||||
if data:
|
||||
# avoid queue growing to infinity. the needed size of the queue might be
|
||||
# a multiple of the total number of parameters -> use a big number
|
||||
if len(self._queue) > 10000:
|
||||
self.log.error('output message buffer overflow')
|
||||
raise OutputBufferOverflow()
|
||||
self._queue.append(data)
|
||||
else:
|
||||
self.log.error('should async_queue empty data!')
|
||||
|
Loading…
x
Reference in New Issue
Block a user