frappy_psi.sea: better timeout behaviour

timeout parameter not only for requests but also for connects
this might avoid blocking situations
This commit is contained in:
l_samenv 2024-05-07 10:23:21 +02:00
parent 76349e38f9
commit 7929c37027

View File

@ -100,7 +100,7 @@ class SeaClient(ProxyClient, Module):
"""connection to SEA"""
uri = Parameter('hostname:portnumber', datatype=StringType(), default='localhost:5000')
timeout = Parameter('timeout', datatype=FloatRange(0), default=10)
timeout = Parameter('timeout for connecting and requests', datatype=FloatRange(0), default=10)
config = Property("""needed SEA configuration, space separated
Example: "ori4.config ori4.stick"
@ -125,7 +125,7 @@ class SeaClient(ProxyClient, Module):
self.path2param = {}
self._write_lock = threading.Lock()
self._connect_thread = None
self._connected = False
self._connected = threading.Event()
config = opts.get('config')
if isinstance(config, dict):
config = config['value']
@ -137,11 +137,12 @@ class SeaClient(ProxyClient, Module):
Module.__init__(self, name, log, opts, srv)
def doPoll(self):
if not self._connected and time.time() > self._last_connect + 10:
if not self._connected.is_set() and time.time() > self._last_connect + self.timeout:
if not self._last_connect:
self.log.info('reconnect to SEA %s', self.service)
if self._connect_thread is None:
self._connect_thread = mkthread(self._connect)
self._connected.wait(self.timeout)
def register_obj(self, module, obj):
self.objects.add(obj)
@ -180,7 +181,6 @@ class SeaClient(ProxyClient, Module):
assert self.syncio.readline() == b'OK'
self.syncio.writeline(b'seauser seaser')
assert self.syncio.readline() == b'Login OK'
self.log.info('connected to %s', self.uri)
result = self.raw_request('frappy_config %s %s' % (self.service, self.config))
if result.startswith('ERROR:'):
@ -188,14 +188,15 @@ class SeaClient(ProxyClient, Module):
# frappy_async_client switches to the json protocol (better for updates)
self.asynio.writeline(b'frappy_async_client')
self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode())
self._connected = True
self.log.info('connected to %s', self.uri)
self._connected.set()
mkthread(self._rxthread)
finally:
self._connect_thread = None
def request(self, command, quiet=False):
with self._write_lock:
if not self._connected:
if not self._connected.is_set():
if self._connect_thread is None:
# let doPoll do the reconnect
self.pollInfo.trigger(True)
@ -209,7 +210,7 @@ class SeaClient(ProxyClient, Module):
ft = 'fulltransAct' if quiet else 'fulltransact'
self.syncio.writeline(('%s %s' % (ft, command)).encode())
result = None
deadline = time.time() + 10
deadline = time.time() + self.timeout
while time.time() < deadline:
reply = self.syncio.readline()
if reply is None:
@ -239,13 +240,13 @@ class SeaClient(ProxyClient, Module):
def close_connections(self):
connections = self.syncio, self.asynio
self._connected = False
self.syncio = self.asynio = None
for conn in connections:
try:
conn.disconnect()
except Exception:
pass
self._connected.clear()
def _rxthread(self):
recheck = None