improve error handling on client connections
- send a heartbeat, if no events for 5 sec. an interrupted connection (not closed by the other end) may not be detected for a long time when nothing is sent + make the error reply on a non SECoPEror more verbose e.g. "KeyError('foo')" instead of just "foo" + allow cfg file without nodeinterface + shorter logger name in HasIodev Change-Id: I6b1ff23f9bf8c96feb25af44935596437b7d726f Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/23098 Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
@ -30,10 +30,11 @@ from collections import defaultdict
|
||||
|
||||
from secop.lib import mkthread, formatExtendedTraceback, formatExtendedStack
|
||||
from secop.lib.asynconn import AsynConn, ConnectionClosed
|
||||
from secop.datatypes import get_datatype, EnumType
|
||||
from secop.datatypes import get_datatype
|
||||
from secop.protocol.interface import encode_msg_frame, decode_msg
|
||||
from secop.protocol.messages import REQUEST2REPLY, ERRORPREFIX, EVENTREPLY, WRITEREQUEST, WRITEREPLY, \
|
||||
READREQUEST, READREPLY, IDENTREQUEST, IDENTPREFIX, ENABLEEVENTSREQUEST, COMMANDREQUEST, DESCRIPTIONREQUEST
|
||||
READREQUEST, READREPLY, IDENTREQUEST, IDENTPREFIX, ENABLEEVENTSREQUEST, COMMANDREQUEST, \
|
||||
DESCRIPTIONREQUEST, HEARTBEATREQUEST
|
||||
import secop.errors
|
||||
import secop.params
|
||||
|
||||
@ -290,13 +291,19 @@ class SecopClient(ProxyClient):
|
||||
self.disconnect(False)
|
||||
|
||||
def __rxthread(self):
|
||||
noactivity = 0
|
||||
while self._running:
|
||||
try:
|
||||
reply = self.io.readline()
|
||||
if reply is None:
|
||||
noactivity += 1
|
||||
if noactivity % 5 == 0:
|
||||
# send ping to check if the connection is still alive
|
||||
self.queue_request(HEARTBEATREQUEST, str(noactivity))
|
||||
continue
|
||||
except ConnectionClosed:
|
||||
break
|
||||
noactivity = 0
|
||||
action, ident, data = decode_msg(reply)
|
||||
if ident == '.':
|
||||
ident = None
|
||||
|
@ -159,14 +159,20 @@ class AsynTcp(AsynConn):
|
||||
return b''.join(data)
|
||||
|
||||
def recv(self):
|
||||
"""return bytes received within 1 sec"""
|
||||
"""return bytes in the recv buffer
|
||||
|
||||
or bytes received within 1 sec
|
||||
"""
|
||||
try:
|
||||
data = self.connection.recv(8192)
|
||||
if data:
|
||||
return data
|
||||
except socket.timeout:
|
||||
except (socket.timeout, TimeoutError):
|
||||
# timeout while waiting
|
||||
return b''
|
||||
# note that when no data is sent on a connection, an interruption might
|
||||
# not be detected within a reasonable time. sending a heartbeat should
|
||||
# help in this case.
|
||||
raise ConnectionClosed() # marks end of connection
|
||||
|
||||
|
||||
|
@ -145,7 +145,7 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
|
||||
'traceback': formatExtendedStack()}])
|
||||
except Exception as err:
|
||||
# create Error Obj instead
|
||||
result = (ERRORPREFIX + msg[0], msg[1], ['InternalError', str(err),
|
||||
result = (ERRORPREFIX + msg[0], msg[1], ['InternalError', repr(err),
|
||||
{'exception': formatException(),
|
||||
'traceback': formatExtendedStack()}])
|
||||
print('--------------------')
|
||||
|
@ -100,8 +100,8 @@ class Server:
|
||||
cfgdict = self.loadCfgFile(filename)
|
||||
ambiguous_sections |= set(merged_cfg) & set(cfgdict)
|
||||
merged_cfg.update(cfgdict)
|
||||
self.node_cfg = merged_cfg.pop('NODE')
|
||||
self.interface_cfg = merged_cfg.pop('INTERFACE')
|
||||
self.node_cfg = merged_cfg.pop('NODE', {})
|
||||
self.interface_cfg = merged_cfg.pop('INTERFACE', {})
|
||||
self.module_cfg = merged_cfg
|
||||
if interface:
|
||||
ambiguous_sections.discard('interface')
|
||||
@ -109,6 +109,8 @@ class Server:
|
||||
self.node_cfg['name'] = name
|
||||
self.node_cfg['id'] = cfgfiles
|
||||
self.interface_cfg['uri'] = str(interface)
|
||||
elif 'uri' not in self.interface_cfg:
|
||||
raise ConfigError('missing interface uri')
|
||||
if ambiguous_sections:
|
||||
self.log.warning('ambiguous sections in %s: %r' % (cfgfiles, tuple(ambiguous_sections)))
|
||||
self._cfgfiles = cfgfiles
|
||||
|
@ -208,7 +208,7 @@ class HasIodev(Module):
|
||||
opts = {'uri': self.uri, 'description': 'communication device for %s' % name,
|
||||
'export': False}
|
||||
ioname = name + '_iodev'
|
||||
iodev = self.iodevClass(ioname, self.log.getChild(ioname), opts, srv)
|
||||
iodev = self.iodevClass(ioname, srv.log.getChild(ioname), opts, srv)
|
||||
srv.modules[ioname] = iodev
|
||||
self.setProperty('iodev', ioname)
|
||||
|
||||
|
Reference in New Issue
Block a user