frappy_psi.sea: try to reconnect on failure

both .asynio and .syncio connection should be tried to reopen.

Change-Id: I836ede1e5fff6f7ac670c92e769da8ec834d2b12
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/31449
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
2023-06-26 16:18:47 +02:00
parent b47e5d270c
commit 9ece4f797b

View File

@ -111,6 +111,7 @@ class SeaClient(ProxyClient, Module):
_connect_thread = None
_service_manager = None
_instance = None
_last_connect = 0
def __init__(self, name, log, opts, srv):
nodename = srv.node_cfg.get('name') or srv.node_cfg.get('equipment_id')
@ -135,6 +136,10 @@ class SeaClient(ProxyClient, Module):
ProxyClient.__init__(self)
Module.__init__(self, name, log, opts, srv)
def doPoll(self):
if not self.asynio and time.time() > self._last_connect + 10:
self._connect_thread = mkthread(self._connect, None)
def register_obj(self, module, obj):
self.objects.add(obj)
for k, v in module.path2param.items():
@ -146,6 +151,7 @@ class SeaClient(ProxyClient, Module):
self._connect_thread = mkthread(self._connect, start_events.get_trigger())
def _connect(self, started_callback):
self._last_connect = time.time()
if self._instance:
if not self._service_manager:
if self._service_manager is None:
@ -192,36 +198,40 @@ class SeaClient(ProxyClient, Module):
self.syncio.writeline(b'seauser seaser')
assert self.syncio.readline() == b'Login OK'
self.log.info('connected to %s', self.uri)
self.syncio.flush_recv()
ft = 'fulltransAct' if quiet else 'fulltransact'
self.syncio.writeline(('%s %s' % (ft, command)).encode())
result = None
deadline = time.time() + 10
while time.time() < deadline:
try:
try:
self.syncio.flush_recv()
ft = 'fulltransAct' if quiet else 'fulltransact'
self.syncio.writeline(('%s %s' % (ft, command)).encode())
result = None
deadline = time.time() + 10
while time.time() < deadline:
reply = self.syncio.readline()
if reply is None:
continue
except ConnectionClosed:
break
reply = reply.decode()
if reply.startswith('TRANSACTIONSTART'):
result = []
continue
if reply == 'TRANSACTIONFINISHED':
reply = reply.decode()
if reply.startswith('TRANSACTIONSTART'):
result = []
continue
if reply == 'TRANSACTIONFINISHED':
if result is None:
self.log.info('missing TRANSACTIONSTART on: %s', command)
return ''
if not result:
return ''
return '\n'.join(result)
if result is None:
self.log.info('missing TRANSACTIONSTART on: %s', command)
return ''
self.log.info('swallow: %s', reply)
continue
if not result:
return ''
return '\n'.join(result)
if result is None:
self.log.info('swallow: %s', reply)
continue
if not result:
result = [reply.split('=', 1)[-1]]
else:
result.append(reply)
result = [reply.split('=', 1)[-1]]
else:
result.append(reply)
except ConnectionClosed:
try:
self.syncio.disconnect()
except Exception:
pass
self.syncio = None
raise TimeoutError('no response within 10s')
def _rxthread(self, started_callback):
@ -231,6 +241,11 @@ class SeaClient(ProxyClient, Module):
if reply is None:
continue
except ConnectionClosed:
try:
self.asynio.disconnect()
except Exception:
pass
self.asynio = None
break
try:
msg = json.loads(reply)