improve error handling in SecopClient

- catch more errors
- improve behaviour on closing connections

Change-Id: I3e2ed1b3b01b8151bb709d5a3735716742e0eec6
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/23123
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2020-05-20 16:12:07 +02:00
parent a25cb3dce5
commit 3261e5e5ff

View File

@ -292,8 +292,9 @@ class SecopClient(ProxyClient):
def __rxthread(self): def __rxthread(self):
noactivity = 0 noactivity = 0
while self._running: try:
try: while self._running:
# may raise ConnectionClosed
reply = self.io.readline() reply = self.io.readline()
if reply is None: if reply is None:
noactivity += 1 noactivity += 1
@ -301,57 +302,61 @@ class SecopClient(ProxyClient):
# send ping to check if the connection is still alive # send ping to check if the connection is still alive
self.queue_request(HEARTBEATREQUEST, str(noactivity)) self.queue_request(HEARTBEATREQUEST, str(noactivity))
continue continue
except ConnectionClosed: noactivity = 0
break action, ident, data = decode_msg(reply)
noactivity = 0 if ident == '.':
action, ident, data = decode_msg(reply) ident = None
if ident == '.': if action in UPDATE_MESSAGES:
ident = None module_param = self.internal.get(ident, None)
if action in UPDATE_MESSAGES: if module_param is None and ':' not in ident:
module_param = self.internal.get(ident, None) # allow missing ':value'/':target'
if module_param is None and ':' not in ident: if action == WRITEREPLY:
# allow missing ':value'/':target' module_param = self.internal.get(ident + ':target', None)
if action == WRITEREPLY: else:
module_param = self.internal.get(ident + ':target', None) module_param = self.internal.get(ident + ':value', None)
else: if module_param is not None:
module_param = self.internal.get(ident + ':value', None) if action.startswith(ERRORPREFIX):
if module_param is not None: timestamp = data[2].get('t', None)
readerror = secop.errors.make_secop_error(*data[0:2])
value = None
else:
timestamp = data[1].get('t', None)
value = data[0]
readerror = None
module, param = module_param
try:
self.updateValue(module, param, value, timestamp, readerror)
except KeyError:
pass # ignore updates of unknown parameters
if action in (EVENTREPLY, ERRORPREFIX + EVENTREPLY):
continue
try:
key = action, ident
entry = self.active_requests.pop(key)
except KeyError:
if action.startswith(ERRORPREFIX): if action.startswith(ERRORPREFIX):
timestamp = data[2].get('t', None) try:
readerror = secop.errors.make_secop_error(*data[0:2]) key = REQUEST2REPLY[action[len(ERRORPREFIX):]], ident
value = None except KeyError:
key = None
entry = self.active_requests.pop(key, None)
else: else:
timestamp = data[1].get('t', None) # this may be a response to the last unknown request
value = data[0]
readerror = None
module, param = module_param
self.updateValue(module, param, value, timestamp, readerror)
if action in (EVENTREPLY, ERRORPREFIX + EVENTREPLY):
continue
try:
key = action, ident
entry = self.active_requests.pop(key)
except KeyError:
if action.startswith(ERRORPREFIX):
try:
key = REQUEST2REPLY[action[len(ERRORPREFIX):]], ident
except KeyError:
key = None key = None
entry = self.active_requests.pop(key, None) entry = self.active_requests.pop(key, None)
else: if entry is None:
# this may be a response to the last unknown request self._unhandled_message(action, ident, data)
key = None continue
entry = self.active_requests.pop(key, None) entry[2] = action, ident, data
if entry is None: entry[1].set() # trigger event
self._unhandled_message(action, ident, data) while not self.pending.empty():
continue # let the TX thread sort out which entry to treat
entry[2] = action, ident, data # this may have bad performance, but happens rarely
entry[1].set() # trigger event self.txq.put(self.pending.get())
while not self.pending.empty(): except ConnectionClosed:
# let the TX thread sort out which entry to treat pass
# this may have bad performance, but happens rarely except Exception as e:
self.txq.put(self.pending.get()) self.log.error('rxthread ended with %s' % e)
self._rxthread = None self._rxthread = None
self.disconnect(False) self.disconnect(False)
if self._shutdown: if self._shutdown:
@ -409,6 +414,11 @@ class SecopClient(ProxyClient):
self._connthread.join() self._connthread.join()
self._connthread = None self._connthread = None
self.disconnect_time = time.time() self.disconnect_time = time.time()
try: # make sure txq does not block
while not self.txq.empty():
self.txq.get(False)
except Exception:
pass
if self._txthread: if self._txthread:
self.txq.put(None) # shutdown marker self.txq.put(None) # shutdown marker
self._txthread.join() self._txthread.join()
@ -498,7 +508,7 @@ class SecopClient(ProxyClient):
self.connect() # make sure we are connected self.connect() # make sure we are connected
# the last item is for the reply # the last item is for the reply
entry = [request, Event(), None] entry = [request, Event(), None]
self.txq.put(entry) self.txq.put(entry, timeout=3)
return entry return entry
def get_reply(self, entry): def get_reply(self, entry):