frappy_psi.sea: auto connect

on both .ssynio and /syncio try to reconnect after failure
This commit is contained in:
l_samenv 2023-06-26 14:45:53 +02:00 committed by Markus Zolliker
parent e36ff9bc41
commit 8e7e70c50e

View File

@ -111,6 +111,7 @@ class SeaClient(ProxyClient, Module):
_connect_thread = None
_service_manager = None
_instance = None
_last_connect = 0
def __init__(self, name, log, opts, srv):
nodename = srv.node_cfg.get('name') or srv.node_cfg.get('equipment_id')
@ -135,6 +136,10 @@ class SeaClient(ProxyClient, Module):
ProxyClient.__init__(self)
Module.__init__(self, name, log, opts, srv)
def doPoll(self):
if not self.asynio and time.time() > self._last_connect + 10:
self._connect_thread = mkthread(self._connect, None)
def register_obj(self, module, obj):
self.objects.add(obj)
for k, v in module.path2param.items():
@ -146,6 +151,7 @@ class SeaClient(ProxyClient, Module):
self._connect_thread = mkthread(self._connect, start_events.get_trigger())
def _connect(self, started_callback):
self._last_connect = time.time()
if self._instance:
if not self._service_manager:
if self._service_manager is None:
@ -192,36 +198,36 @@ class SeaClient(ProxyClient, Module):
self.syncio.writeline(b'seauser seaser')
assert self.syncio.readline() == b'Login OK'
self.log.info('connected to %s', self.uri)
self.syncio.flush_recv()
ft = 'fulltransAct' if quiet else 'fulltransact'
self.syncio.writeline(('%s %s' % (ft, command)).encode())
result = None
deadline = time.time() + 10
while time.time() < deadline:
try:
try:
self.syncio.flush_recv()
ft = 'fulltransAct' if quiet else 'fulltransact'
self.syncio.writeline(('%s %s' % (ft, command)).encode())
result = None
deadline = time.time() + 10
while time.time() < deadline:
reply = self.syncio.readline()
if reply is None:
continue
except ConnectionClosed:
break
reply = reply.decode()
if reply.startswith('TRANSACTIONSTART'):
result = []
continue
if reply == 'TRANSACTIONFINISHED':
reply = reply.decode()
if reply.startswith('TRANSACTIONSTART'):
result = []
continue
if reply == 'TRANSACTIONFINISHED':
if result is None:
self.log.info('missing TRANSACTIONSTART on: %s', command)
return ''
if not result:
return ''
return '\n'.join(result)
if result is None:
self.log.info('missing TRANSACTIONSTART on: %s', command)
return ''
self.log.info('swallow: %s', reply)
continue
if not result:
return ''
return '\n'.join(result)
if result is None:
self.log.info('swallow: %s', reply)
continue
if not result:
result = [reply.split('=', 1)[-1]]
else:
result.append(reply)
result = [reply.split('=', 1)[-1]]
else:
result.append(reply)
except ConnectionClosed:
self.syncio = None
raise TimeoutError('no response within 10s')
def _rxthread(self, started_callback):
@ -231,6 +237,10 @@ class SeaClient(ProxyClient, Module):
if reply is None:
continue
except ConnectionClosed:
try:
self.asynio.close()
except Exception:
self.asynio = None
break
try:
msg = json.loads(reply)
@ -463,7 +473,6 @@ class SeaModule(Module):
descr['params'].pop(0)
else:
# filter by relative paths
# rel_paths = rel_paths.split()
result = []
is_running = None
for rpath in rel_paths: