frappy_psi.sea: various improvments

- always open asynio and syncio connections (conenctions for
  update and command)
- better synchronization when reconnecting using threading.Event
- nicer error messages

Change-Id: Ia435c3ccfa2732be4aa9f24a3b6e8484fab715a3
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33909
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
2024-06-11 17:29:46 +02:00
parent 924a9a2c7f
commit 7dc4cf7029

View File

@@ -39,13 +39,13 @@ from os.path import expanduser, join, exists
from frappy.client import ProxyClient
from frappy.datatypes import ArrayOf, BoolType, \
EnumType, FloatRange, IntRange, StringType
from frappy.errors import ConfigError, HardwareError, secop_error, CommunicationFailedError
from frappy.core import IDLE, BUSY, ERROR
from frappy.errors import ConfigError, HardwareError, CommunicationFailedError
from frappy.lib import generalConfig, mkthread
from frappy.lib.asynconn import AsynConn, ConnectionClosed
from frappy.modulebase import Done
from frappy.modules import Attached, Command, Drivable, \
Module, Parameter, Property, Readable, Writable
from frappy.protocol.dispatcher import make_update
CFG_HEADER = """Node('%(config)s.sea.psi.ch',
@@ -100,7 +100,8 @@ class SeaClient(ProxyClient, Module):
"""connection to SEA"""
uri = Parameter('hostname:portnumber', datatype=StringType(), default='localhost:5000')
timeout = Parameter('timeout', datatype=FloatRange(0), default=10)
timeout = Parameter('timeout for connecting and requests',
datatype=FloatRange(0), default=10)
config = Property("""needed SEA configuration, space separated
Example: "ori4.config ori4.stick"
@@ -108,7 +109,6 @@ class SeaClient(ProxyClient, Module):
service = Property("main/stick/addons", StringType(), default='')
visibility = 'expert'
default_json_file = {}
_connect_thread = None
_instance = None
_last_connect = 0
@@ -125,6 +125,8 @@ class SeaClient(ProxyClient, Module):
self.shutdown = False
self.path2param = {}
self._write_lock = threading.Lock()
self._connect_thread = None
self._connected = threading.Event()
config = opts.get('config')
if isinstance(config, dict):
config = config['value']
@@ -136,14 +138,12 @@ class SeaClient(ProxyClient, Module):
Module.__init__(self, name, log, opts, srv)
def doPoll(self):
if not self.asynio and time.time() > self._last_connect + 10:
with self._write_lock:
# make sure no more connect thread is running
if self._connect_thread and self._connect_thread.isAlive():
return
if not self._last_connect:
self.log.info('reconnect to SEA %s', self.service)
self._connect_thread = mkthread(self._connect, None)
if not self._connected.is_set() and time.time() > self._last_connect + self.timeout:
if not self._last_connect:
self.log.info('reconnect to SEA %s', self.service)
if self._connect_thread is None:
self._connect_thread = mkthread(self._connect)
self._connected.wait(self.timeout)
def register_obj(self, module, obj):
self.objects.add(obj)
@@ -151,99 +151,105 @@ class SeaClient(ProxyClient, Module):
self.path2param.setdefault(k, []).extend(v)
self.register_callback(module.name, module.updateEvent)
def _connect(self, started_callback):
self.asynio = None
if self.syncio:
# trigger syncio reconnect in self.request()
try:
self.syncio.disconnect()
except Exception:
pass
self.syncio = None
self._last_connect = time.time()
if self._instance:
try:
from servicemanager import SeaManager # pylint: disable=import-outside-toplevel
SeaManager().do_start(self._instance)
except ImportError:
pass
if '//' not in self.uri:
self.uri = 'tcp://' + self.uri
self.asynio = AsynConn(self.uri)
reply = self.asynio.readline()
if reply != b'OK':
raise CommunicationFailedError('reply %r should be "OK"' % reply)
for _ in range(2):
self.asynio.writeline(b'Spy 007')
reply = self.asynio.readline()
if reply == b'Login OK':
break
else:
raise CommunicationFailedError('reply %r should be "Login OK"' % reply)
result = self.request('frappy_config %s %s' % (self.service, self.config))
if result.startswith('ERROR:'):
raise CommunicationFailedError(f'reply from frappy_config: {result}')
# frappy_async_client switches to the json protocol (better for updates)
self.asynio.writeline(b'frappy_async_client')
self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode())
self._connect_thread = None
mkthread(self._rxthread, started_callback)
def request(self, command, quiet=False):
"""send a request and wait for reply"""
with self._write_lock:
if not self.syncio or not self.syncio.connection:
if not self.asynio or not self.asynio.connection:
try:
self._connect_thread.join()
except AttributeError:
pass
# let doPoll do the reconnect
self.pollInfo.trigger(True)
raise ConnectionClosed('disconnected - reconnect later')
self.syncio = AsynConn(self.uri)
assert self.syncio.readline() == b'OK'
self.syncio.writeline(b'seauser seaser')
assert self.syncio.readline() == b'Login OK'
self.log.info('connected to %s', self.uri)
try:
self.syncio.flush_recv()
ft = 'fulltransAct' if quiet else 'fulltransact'
self.syncio.writeline(('%s %s' % (ft, command)).encode())
result = None
deadline = time.time() + 10
while time.time() < deadline:
reply = self.syncio.readline()
if reply is None:
continue
reply = reply.decode()
if reply.startswith('TRANSACTIONSTART'):
result = []
continue
if reply == 'TRANSACTIONFINISHED':
if result is None:
self.log.info('missing TRANSACTIONSTART on: %s', command)
return ''
if not result:
return ''
return '\n'.join(result)
if result is None:
self.log.info('swallow: %s', reply)
continue
if not result:
result = [reply.split('=', 1)[-1]]
else:
result.append(reply)
except ConnectionClosed:
def _connect(self):
try:
if self.syncio:
try:
self.syncio.disconnect()
except Exception:
pass
self.syncio = None
raise
raise TimeoutError('no response within 10s')
self._last_connect = time.time()
if self._instance:
try:
from servicemanager import SeaManager # pylint: disable=import-outside-toplevel
SeaManager().do_start(self._instance)
except ImportError:
pass
if '//' not in self.uri:
self.uri = 'tcp://' + self.uri
self.asynio = AsynConn(self.uri)
reply = self.asynio.readline()
if reply != b'OK':
raise CommunicationFailedError('reply %r should be "OK"' % reply)
for _ in range(2):
self.asynio.writeline(b'Spy 007')
reply = self.asynio.readline()
if reply == b'Login OK':
break
else:
raise CommunicationFailedError('reply %r should be "Login OK"' % reply)
self.syncio = AsynConn(self.uri)
assert self.syncio.readline() == b'OK'
self.syncio.writeline(b'seauser seaser')
assert self.syncio.readline() == b'Login OK'
def _rxthread(self, started_callback):
result = self.raw_request('frappy_config %s %s' % (self.service, self.config))
if result.startswith('ERROR:'):
raise CommunicationFailedError(f'reply from frappy_config: {result}')
# frappy_async_client switches to the json protocol (better for updates)
self.asynio.writeline(b'frappy_async_client')
self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode())
self.log.info('connected to %s', self.uri)
self._connected.set()
mkthread(self._rxthread)
finally:
self._connect_thread = None
def request(self, command, quiet=False):
with self._write_lock:
if not self._connected.is_set():
if self._connect_thread is None:
# let doPoll do the reconnect
self.pollInfo.trigger(True)
raise ConnectionClosed('disconnected - reconnect is tried later')
return self.raw_request(command, quiet)
def raw_request(self, command, quiet=False):
"""send a request and wait for reply"""
try:
self.syncio.flush_recv()
ft = 'fulltransAct' if quiet else 'fulltransact'
self.syncio.writeline(('%s %s' % (ft, command)).encode())
result = None
deadline = time.time() + self.timeout
while time.time() < deadline:
reply = self.syncio.readline()
if reply is None:
continue
reply = reply.decode()
if reply.startswith('TRANSACTIONSTART'):
result = []
continue
if reply == 'TRANSACTIONFINISHED':
if result is None:
self.log.info('missing TRANSACTIONSTART on: %s', command)
return ''
if not result:
return ''
return '\n'.join(result)
if result is None:
self.log.info('swallow: %s', reply)
continue
if not result:
result = [reply.split('=', 1)[-1]]
else:
result.append(reply)
raise TimeoutError('no response within 10s')
except ConnectionClosed:
self.close_connections()
raise
def close_connections(self):
connections = self.syncio, self.asynio
self.syncio = self.asynio = None
for conn in connections:
try:
conn.disconnect()
except Exception:
pass
self._connected.clear()
def _rxthread(self):
recheck = None
while not self.shutdown:
if recheck and time.time() > recheck:
@@ -259,11 +265,7 @@ class SeaClient(ProxyClient, Module):
if reply is None:
continue
except ConnectionClosed:
try:
self.asynio.disconnect()
except Exception:
pass
self.asynio = None
self.close_connections()
break
try:
msg = json.loads(reply)
@@ -290,9 +292,6 @@ class SeaClient(ProxyClient, Module):
data = msg['data']
if flag == 'finish' and obj == 'get_all_param':
# first updates have finished
if started_callback:
started_callback()
started_callback = None
continue
if flag != 'hdbevent':
if obj not in ('frappy_async_client', 'get_all_param'):
@@ -353,7 +352,7 @@ class SeaClient(ProxyClient, Module):
class SeaConfigCreator(SeaClient):
def startModule(self, start_events):
"""save objects (and sub-objects) description and exit"""
self._connect(None)
self._connect()
reply = self.request('describe_all')
reply = ''.join('' if line.startswith('WARNING') else line for line in reply.split('\n'))
description, reply = json.loads(reply)
@@ -645,22 +644,7 @@ class SeaModule(Module):
if upd:
upd(value, timestamp, readerror)
return
try:
pobj = self.parameters[parameter]
except KeyError:
self.log.error('do not know %s:%s', self.name, parameter)
raise
pobj.timestamp = timestamp
# should be done here: deal with clock differences
if not readerror:
try:
pobj.value = value # store the value even in case of a validation error
pobj.value = pobj.datatype(value)
except Exception as e:
readerror = secop_error(e)
pobj.readerror = readerror
if pobj.export:
self.secNode.srv.dispatcher.broadcast_event(make_update(self.name, pobj))
self.announceUpdate(parameter, value, readerror, timestamp)
def initModule(self):
self.io.register_obj(self, self.sea_object)
@@ -671,20 +655,35 @@ class SeaModule(Module):
class SeaReadable(SeaModule, Readable):
_readerror = None
_status = IDLE, ''
def update_value(self, value, timestamp, readerror):
# make sure status is always ERROR when reading value fails
self._readerror = readerror
if readerror:
self.read_status() # forced ERROR status
self.announceUpdate('value', value, readerror, timestamp)
else: # order is important
self.value = value # includes announceUpdate
self.read_status() # send event for ordinary self._status
def update_status(self, value, timestamp, readerror):
if readerror:
value = repr(readerror)
value = f'{readerror.name} - {readerror}'
if value == '':
self.status = (self.Status.IDLE, '')
self._status = IDLE, ''
else:
self.status = (self.Status.ERROR, value)
self._status = ERROR, value
self.read_status()
def read_status(self):
return self.status
if self._readerror:
return ERROR, f'{self._readerror.name} - {self._readerror}'
return self._status
class SeaWritable(SeaModule, Writable):
class SeaWritable(SeaReadable, Writable):
def read_value(self):
return self.target
@@ -694,20 +693,13 @@ class SeaWritable(SeaModule, Writable):
self.value = value
class SeaDrivable(SeaModule, Drivable):
_sea_status = ''
class SeaDrivable(SeaReadable, Drivable):
_is_running = 0
def earlyInit(self):
super().earlyInit()
self._run_event = threading.Event()
def read_status(self):
return self.status
# def read_target(self):
# return self.target
def write_target(self, value):
self._run_event.clear()
self.io.query(f'run {self.sea_object} {value}')
@@ -715,25 +707,20 @@ class SeaDrivable(SeaModule, Drivable):
self.log.warn('target changed but is_running stays 0')
return value
def update_status(self, value, timestamp, readerror):
if not readerror:
self._sea_status = value
self.updateStatus()
def update_is_running(self, value, timestamp, readerror):
if not readerror:
self._is_running = value
self.updateStatus()
self.read_status()
if value:
self._run_event.set()
def updateStatus(self):
if self._sea_status:
self.status = (self.Status.ERROR, self._sea_status)
elif self._is_running:
self.status = (self.Status.BUSY, 'driving')
else:
self.status = (self.Status.IDLE, '')
def read_status(self):
status = super().read_status()
if self._is_running:
if status[0] >= ERROR:
return ERROR, 'BUSY + ' + status[1]
return BUSY, 'driving'
return status
def updateTarget(self, module, parameter, value, timestamp, readerror):
if value is not None: