diff --git a/secop/client/__init__.py b/secop/client/__init__.py index 82b1c04..7b8a402 100644 --- a/secop/client/__init__.py +++ b/secop/client/__init__.py @@ -292,8 +292,9 @@ class SecopClient(ProxyClient): def __rxthread(self): noactivity = 0 - while self._running: - try: + try: + while self._running: + # may raise ConnectionClosed reply = self.io.readline() if reply is None: noactivity += 1 @@ -301,57 +302,61 @@ class SecopClient(ProxyClient): # send ping to check if the connection is still alive self.queue_request(HEARTBEATREQUEST, str(noactivity)) continue - except ConnectionClosed: - break - noactivity = 0 - action, ident, data = decode_msg(reply) - if ident == '.': - ident = None - if action in UPDATE_MESSAGES: - module_param = self.internal.get(ident, None) - if module_param is None and ':' not in ident: - # allow missing ':value'/':target' - if action == WRITEREPLY: - module_param = self.internal.get(ident + ':target', None) - else: - module_param = self.internal.get(ident + ':value', None) - if module_param is not None: + noactivity = 0 + action, ident, data = decode_msg(reply) + if ident == '.': + ident = None + if action in UPDATE_MESSAGES: + module_param = self.internal.get(ident, None) + if module_param is None and ':' not in ident: + # allow missing ':value'/':target' + if action == WRITEREPLY: + module_param = self.internal.get(ident + ':target', None) + else: + module_param = self.internal.get(ident + ':value', None) + if module_param is not None: + if action.startswith(ERRORPREFIX): + timestamp = data[2].get('t', None) + readerror = secop.errors.make_secop_error(*data[0:2]) + value = None + else: + timestamp = data[1].get('t', None) + value = data[0] + readerror = None + module, param = module_param + try: + self.updateValue(module, param, value, timestamp, readerror) + except KeyError: + pass # ignore updates of unknown parameters + if action in (EVENTREPLY, ERRORPREFIX + EVENTREPLY): + continue + try: + key = action, ident + entry = self.active_requests.pop(key) + except KeyError: if action.startswith(ERRORPREFIX): - timestamp = data[2].get('t', None) - readerror = secop.errors.make_secop_error(*data[0:2]) - value = None + try: + key = REQUEST2REPLY[action[len(ERRORPREFIX):]], ident + except KeyError: + key = None + entry = self.active_requests.pop(key, None) else: - timestamp = data[1].get('t', None) - value = data[0] - readerror = None - module, param = module_param - self.updateValue(module, param, value, timestamp, readerror) - if action in (EVENTREPLY, ERRORPREFIX + EVENTREPLY): - continue - try: - key = action, ident - entry = self.active_requests.pop(key) - except KeyError: - if action.startswith(ERRORPREFIX): - try: - key = REQUEST2REPLY[action[len(ERRORPREFIX):]], ident - except KeyError: + # this may be a response to the last unknown request key = None - entry = self.active_requests.pop(key, None) - else: - # this may be a response to the last unknown request - key = None - entry = self.active_requests.pop(key, None) - if entry is None: - self._unhandled_message(action, ident, data) - continue - entry[2] = action, ident, data - entry[1].set() # trigger event - while not self.pending.empty(): - # let the TX thread sort out which entry to treat - # this may have bad performance, but happens rarely - self.txq.put(self.pending.get()) - + entry = self.active_requests.pop(key, None) + if entry is None: + self._unhandled_message(action, ident, data) + continue + entry[2] = action, ident, data + entry[1].set() # trigger event + while not self.pending.empty(): + # let the TX thread sort out which entry to treat + # this may have bad performance, but happens rarely + self.txq.put(self.pending.get()) + except ConnectionClosed: + pass + except Exception as e: + self.log.error('rxthread ended with %s' % e) self._rxthread = None self.disconnect(False) if self._shutdown: @@ -409,6 +414,11 @@ class SecopClient(ProxyClient): self._connthread.join() self._connthread = None self.disconnect_time = time.time() + try: # make sure txq does not block + while not self.txq.empty(): + self.txq.get(False) + except Exception: + pass if self._txthread: self.txq.put(None) # shutdown marker self._txthread.join() @@ -498,7 +508,7 @@ class SecopClient(ProxyClient): self.connect() # make sure we are connected # the last item is for the reply entry = [request, Event(), None] - self.txq.put(entry) + self.txq.put(entry, timeout=3) return entry def get_reply(self, entry):