improve sea interface

Change-Id: I58fb4b10ef9466f90e4cd58b6c67bcfb11c493e3
This commit is contained in:
2024-03-08 15:59:16 +01:00
parent 7d02498b3d
commit 1169e0cd09

View File

@ -39,12 +39,13 @@ from os.path import expanduser, join, exists
from frappy.client import ProxyClient
from frappy.datatypes import ArrayOf, BoolType, \
EnumType, FloatRange, IntRange, StringType
from frappy.errors import ConfigError, HardwareError, secop_error, CommunicationFailedError
from frappy.core import IDLE, BUSY, ERROR
from frappy.errors import ConfigError, HardwareError, CommunicationFailedError
from frappy.lib import generalConfig, mkthread
from frappy.lib.asynconn import AsynConn, ConnectionClosed
from frappy.modules import Attached, Command, Done, Drivable, \
from frappy.modulebase import Done
from frappy.modules import Attached, Command, Drivable, \
Module, Parameter, Property, Readable, Writable
from frappy.protocol.dispatcher import make_update
CFG_HEADER = """Node('%(config)s.sea.psi.ch',
@ -107,7 +108,6 @@ class SeaClient(ProxyClient, Module):
service = Property("main/stick/addons", StringType(), default='')
visibility = 'expert'
default_json_file = {}
_connect_thread = None
_instance = None
_last_connect = 0
@ -124,6 +124,8 @@ class SeaClient(ProxyClient, Module):
self.shutdown = False
self.path2param = {}
self._write_lock = threading.Lock()
self._connect_thread = None
self._connected = False
config = opts.get('config')
if isinstance(config, dict):
config = config['value']
@ -135,14 +137,11 @@ class SeaClient(ProxyClient, Module):
Module.__init__(self, name, log, opts, srv)
def doPoll(self):
if not self.asynio and time.time() > self._last_connect + 10:
with self._write_lock:
# make sure no more connect thread is running
if self._connect_thread and self._connect_thread.isAlive():
return
if not self._last_connect:
self.log.info('reconnect to SEA %s', self.service)
self._connect_thread = mkthread(self._connect, None)
if not self._connected and time.time() > self._last_connect + 10:
if not self._last_connect:
self.log.info('reconnect to SEA %s', self.service)
if self._connect_thread is None:
self._connect_thread = mkthread(self._connect)
def register_obj(self, module, obj):
self.objects.add(obj)
@ -150,99 +149,105 @@ class SeaClient(ProxyClient, Module):
self.path2param.setdefault(k, []).extend(v)
self.register_callback(module.name, module.updateEvent)
def _connect(self, started_callback):
self.asynio = None
if self.syncio:
# trigger syncio reconnect in self.request()
try:
self.syncio.disconnect()
except Exception:
pass
self.syncio = None
self._last_connect = time.time()
if self._instance:
try:
from servicemanager import SeaManager # pylint: disable=import-outside-toplevel
SeaManager().do_start(self._instance)
except ImportError:
pass
if '//' not in self.uri:
self.uri = 'tcp://' + self.uri
self.asynio = AsynConn(self.uri)
reply = self.asynio.readline()
if reply != b'OK':
raise CommunicationFailedError('reply %r should be "OK"' % reply)
for _ in range(2):
self.asynio.writeline(b'Spy 007')
reply = self.asynio.readline()
if reply == b'Login OK':
break
else:
raise CommunicationFailedError('reply %r should be "Login OK"' % reply)
result = self.request('frappy_config %s %s' % (self.service, self.config))
if result.startswith('ERROR:'):
raise CommunicationFailedError(f'reply from frappy_config: {result}')
# frappy_async_client switches to the json protocol (better for updates)
self.asynio.writeline(b'frappy_async_client')
self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode())
self._connect_thread = None
mkthread(self._rxthread, started_callback)
def request(self, command, quiet=False):
"""send a request and wait for reply"""
with self._write_lock:
if not self.syncio or not self.syncio.connection:
if not self.asynio or not self.asynio.connection:
try:
self._connect_thread.join()
except AttributeError:
pass
# let doPoll do the reconnect
self.pollInfo.trigger(True)
raise ConnectionClosed('disconnected - reconnect later')
self.syncio = AsynConn(self.uri)
assert self.syncio.readline() == b'OK'
self.syncio.writeline(b'seauser seaser')
assert self.syncio.readline() == b'Login OK'
self.log.info('connected to %s', self.uri)
try:
self.syncio.flush_recv()
ft = 'fulltransAct' if quiet else 'fulltransact'
self.syncio.writeline(('%s %s' % (ft, command)).encode())
result = None
deadline = time.time() + 10
while time.time() < deadline:
reply = self.syncio.readline()
if reply is None:
continue
reply = reply.decode()
if reply.startswith('TRANSACTIONSTART'):
result = []
continue
if reply == 'TRANSACTIONFINISHED':
if result is None:
self.log.info('missing TRANSACTIONSTART on: %s', command)
return ''
if not result:
return ''
return '\n'.join(result)
if result is None:
self.log.info('swallow: %s', reply)
continue
if not result:
result = [reply.split('=', 1)[-1]]
else:
result.append(reply)
except ConnectionClosed:
def _connect(self):
try:
if self.syncio:
try:
self.syncio.disconnect()
except Exception:
pass
self.syncio = None
raise
raise TimeoutError('no response within 10s')
self._last_connect = time.time()
if self._instance:
try:
from servicemanager import SeaManager # pylint: disable=import-outside-toplevel
SeaManager().do_start(self._instance)
except ImportError:
pass
if '//' not in self.uri:
self.uri = 'tcp://' + self.uri
self.asynio = AsynConn(self.uri)
reply = self.asynio.readline()
if reply != b'OK':
raise CommunicationFailedError('reply %r should be "OK"' % reply)
for _ in range(2):
self.asynio.writeline(b'Spy 007')
reply = self.asynio.readline()
if reply == b'Login OK':
break
else:
raise CommunicationFailedError('reply %r should be "Login OK"' % reply)
self.syncio = AsynConn(self.uri)
assert self.syncio.readline() == b'OK'
self.syncio.writeline(b'seauser seaser')
assert self.syncio.readline() == b'Login OK'
self.log.info('connected to %s', self.uri)
def _rxthread(self, started_callback):
result = self.raw_request('frappy_config %s %s' % (self.service, self.config))
if result.startswith('ERROR:'):
raise CommunicationFailedError(f'reply from frappy_config: {result}')
# frappy_async_client switches to the json protocol (better for updates)
self.asynio.writeline(b'frappy_async_client')
self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode())
self._connected = True
mkthread(self._rxthread)
finally:
self._connect_thread = None
def request(self, command, quiet=False):
with self._write_lock:
if not self._connected:
if self._connect_thread is None:
# let doPoll do the reconnect
self.pollInfo.trigger(True)
raise ConnectionClosed('disconnected - reconnect is tried later')
return self.raw_request(command, quiet)
def raw_request(self, command, quiet=False):
"""send a request and wait for reply"""
try:
self.syncio.flush_recv()
ft = 'fulltransAct' if quiet else 'fulltransact'
self.syncio.writeline(('%s %s' % (ft, command)).encode())
result = None
deadline = time.time() + 10
while time.time() < deadline:
reply = self.syncio.readline()
if reply is None:
continue
reply = reply.decode()
if reply.startswith('TRANSACTIONSTART'):
result = []
continue
if reply == 'TRANSACTIONFINISHED':
if result is None:
self.log.info('missing TRANSACTIONSTART on: %s', command)
return ''
if not result:
return ''
return '\n'.join(result)
if result is None:
self.log.info('swallow: %s', reply)
continue
if not result:
result = [reply.split('=', 1)[-1]]
else:
result.append(reply)
raise TimeoutError('no response within 10s')
except ConnectionClosed:
self.close_connections()
raise
def close_connections(self):
connections = self.syncio, self.asynio
self._connected = False
self.syncio = self.asynio = None
for conn in connections:
try:
conn.disconnect()
except Exception:
pass
def _rxthread(self):
recheck = None
while not self.shutdown:
if recheck and time.time() > recheck:
@ -258,11 +263,7 @@ class SeaClient(ProxyClient, Module):
if reply is None:
continue
except ConnectionClosed:
try:
self.asynio.disconnect()
except Exception:
pass
self.asynio = None
self.close_connections()
break
try:
msg = json.loads(reply)
@ -289,9 +290,6 @@ class SeaClient(ProxyClient, Module):
data = msg['data']
if flag == 'finish' and obj == 'get_all_param':
# first updates have finished
if started_callback:
started_callback()
started_callback = None
continue
if flag != 'hdbevent':
if obj not in ('frappy_async_client', 'get_all_param'):
@ -352,7 +350,7 @@ class SeaClient(ProxyClient, Module):
class SeaConfigCreator(SeaClient):
def startModule(self, start_events):
"""save objects (and sub-objects) description and exit"""
self._connect(None)
self._connect()
reply = self.request('describe_all')
reply = ''.join('' if line.startswith('WARNING') else line for line in reply.split('\n'))
description, reply = json.loads(reply)
@ -644,22 +642,7 @@ class SeaModule(Module):
if upd:
upd(value, timestamp, readerror)
return
try:
pobj = self.parameters[parameter]
except KeyError:
self.log.error('do not know %s:%s', self.name, parameter)
raise
pobj.timestamp = timestamp
# should be done here: deal with clock differences
if not readerror:
try:
pobj.value = value # store the value even in case of a validation error
pobj.value = pobj.datatype(value)
except Exception as e:
readerror = secop_error(e)
pobj.readerror = readerror
if pobj.export:
self.secNode.srv.dispatcher.broadcast_event(make_update(self.name, pobj))
self.announceUpdate(parameter, value, readerror, timestamp)
def initModule(self):
self.io.register_obj(self, self.sea_object)
@ -670,20 +653,35 @@ class SeaModule(Module):
class SeaReadable(SeaModule, Readable):
_readerror = None
_status = IDLE, ''
def update_value(self, value, timestamp, readerror):
# make sure status is always ERROR when reading value fails
self._readerror = readerror
if readerror:
self.read_status() # forced ERROR status
self.announceUpdate('value', value, readerror, timestamp)
else: # order is important
self.value = value # includes announceUpdate
self.read_status() # send event for ordinary self._status
def update_status(self, value, timestamp, readerror):
if readerror:
value = repr(readerror)
value = f'{readerror.name} - {readerror}'
if value == '':
self.status = (self.Status.IDLE, '')
self._status = IDLE, ''
else:
self.status = (self.Status.ERROR, value)
self._status = ERROR, value
self.read_status()
def read_status(self):
return self.status
if self._readerror:
return ERROR, f'{self._readerror.name} - {self._readerror}'
return self._status
class SeaWritable(SeaModule, Writable):
class SeaWritable(SeaReadable, Writable):
def read_value(self):
return self.target
@ -693,20 +691,13 @@ class SeaWritable(SeaModule, Writable):
self.value = value
class SeaDrivable(SeaModule, Drivable):
_sea_status = ''
class SeaDrivable(SeaReadable, Drivable):
_is_running = 0
def earlyInit(self):
super().earlyInit()
self._run_event = threading.Event()
def read_status(self):
return self.status
# def read_target(self):
# return self.target
def write_target(self, value):
self._run_event.clear()
self.io.query(f'run {self.sea_object} {value}')
@ -714,25 +705,20 @@ class SeaDrivable(SeaModule, Drivable):
self.log.warn('target changed but is_running stays 0')
return value
def update_status(self, value, timestamp, readerror):
if not readerror:
self._sea_status = value
self.updateStatus()
def update_is_running(self, value, timestamp, readerror):
if not readerror:
self._is_running = value
self.updateStatus()
self.read_status()
if value:
self._run_event.set()
def updateStatus(self):
if self._sea_status:
self.status = (self.Status.ERROR, self._sea_status)
elif self._is_running:
self.status = (self.Status.BUSY, 'driving')
else:
self.status = (self.Status.IDLE, '')
def read_status(self):
status = super().read_status()
if self._is_running:
if status[0] >= ERROR:
return ERROR, 'BUSY + ' + status[1]
return BUSY, 'driving'
return status
def updateTarget(self, module, parameter, value, timestamp, readerror):
if value is not None: