From 8e7e70c50e305c9ed49d83946416bdeb84ec5e8c Mon Sep 17 00:00:00 2001 From: l_samenv Date: Mon, 26 Jun 2023 14:45:53 +0200 Subject: [PATCH] frappy_psi.sea: auto connect on both .ssynio and /syncio try to reconnect after failure --- frappy_psi/sea.py | 61 +++++++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/frappy_psi/sea.py b/frappy_psi/sea.py index 5bc6276..c437a4e 100644 --- a/frappy_psi/sea.py +++ b/frappy_psi/sea.py @@ -111,6 +111,7 @@ class SeaClient(ProxyClient, Module): _connect_thread = None _service_manager = None _instance = None + _last_connect = 0 def __init__(self, name, log, opts, srv): nodename = srv.node_cfg.get('name') or srv.node_cfg.get('equipment_id') @@ -135,6 +136,10 @@ class SeaClient(ProxyClient, Module): ProxyClient.__init__(self) Module.__init__(self, name, log, opts, srv) + def doPoll(self): + if not self.asynio and time.time() > self._last_connect + 10: + self._connect_thread = mkthread(self._connect, None) + def register_obj(self, module, obj): self.objects.add(obj) for k, v in module.path2param.items(): @@ -146,6 +151,7 @@ class SeaClient(ProxyClient, Module): self._connect_thread = mkthread(self._connect, start_events.get_trigger()) def _connect(self, started_callback): + self._last_connect = time.time() if self._instance: if not self._service_manager: if self._service_manager is None: @@ -192,36 +198,36 @@ class SeaClient(ProxyClient, Module): self.syncio.writeline(b'seauser seaser') assert self.syncio.readline() == b'Login OK' self.log.info('connected to %s', self.uri) - self.syncio.flush_recv() - ft = 'fulltransAct' if quiet else 'fulltransact' - self.syncio.writeline(('%s %s' % (ft, command)).encode()) - result = None - deadline = time.time() + 10 - while time.time() < deadline: - try: + try: + self.syncio.flush_recv() + ft = 'fulltransAct' if quiet else 'fulltransact' + self.syncio.writeline(('%s %s' % (ft, command)).encode()) + result = None + deadline = time.time() + 10 + while time.time() < deadline: reply = self.syncio.readline() if reply is None: continue - except ConnectionClosed: - break - reply = reply.decode() - if reply.startswith('TRANSACTIONSTART'): - result = [] - continue - if reply == 'TRANSACTIONFINISHED': + reply = reply.decode() + if reply.startswith('TRANSACTIONSTART'): + result = [] + continue + if reply == 'TRANSACTIONFINISHED': + if result is None: + self.log.info('missing TRANSACTIONSTART on: %s', command) + return '' + if not result: + return '' + return '\n'.join(result) if result is None: - self.log.info('missing TRANSACTIONSTART on: %s', command) - return '' + self.log.info('swallow: %s', reply) + continue if not result: - return '' - return '\n'.join(result) - if result is None: - self.log.info('swallow: %s', reply) - continue - if not result: - result = [reply.split('=', 1)[-1]] - else: - result.append(reply) + result = [reply.split('=', 1)[-1]] + else: + result.append(reply) + except ConnectionClosed: + self.syncio = None raise TimeoutError('no response within 10s') def _rxthread(self, started_callback): @@ -231,6 +237,10 @@ class SeaClient(ProxyClient, Module): if reply is None: continue except ConnectionClosed: + try: + self.asynio.close() + except Exception: + self.asynio = None break try: msg = json.loads(reply) @@ -463,7 +473,6 @@ class SeaModule(Module): descr['params'].pop(0) else: # filter by relative paths - # rel_paths = rel_paths.split() result = [] is_running = None for rpath in rel_paths: