diff --git a/frappy/client/__init__.py b/frappy/client/__init__.py index d12d19e..ba85dc1 100644 --- a/frappy/client/__init__.py +++ b/frappy/client/__init__.py @@ -282,6 +282,7 @@ class SecopClient(ProxyClient): self.nodename = uri self._lock = RLock() self._shutdown = Event() + self.cleanup = [] def __del__(self): try: @@ -297,6 +298,10 @@ class SecopClient(ProxyClient): with self._lock: if self.io: return + self.txq = queue.Queue(30) + self.pending = queue.Queue(30) + self.active_requests.clear() + self.cleanup.clear() if self.online: self._set_state(True, 'reconnecting') else: @@ -368,6 +373,12 @@ class SecopClient(ProxyClient): noactivity = 0 try: while self._running: + while self.cleanup: + entry = self.cleanup.pop() + for key, prev in self.active_requests.items(): + if prev is entry: + self.active_requests.pop(key) + break # may raise ConnectionClosed reply = self.io.readline() if reply is None: @@ -591,8 +602,10 @@ class SecopClient(ProxyClient): def get_reply(self, entry): """wait for reply and return it""" if not entry[1].wait(10): # event + self.cleanup.append(entry) raise TimeoutError('no response within 10s') if not entry[2]: # reply + # no cleanup needed as self.active_requests will be cleared on connect raise ConnectionError('connection closed before reply') action, _, data = entry[2] # pylint: disable=unpacking-non-sequence if action.startswith(ERRORPREFIX):