From 1169e0cd09cb7f71188b11dd577fe7e306b1fa82 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Fri, 8 Mar 2024 15:59:16 +0100 Subject: [PATCH] improve sea interface Change-Id: I58fb4b10ef9466f90e4cd58b6c67bcfb11c493e3 --- frappy_psi/sea.py | 288 ++++++++++++++++++++++------------------------ 1 file changed, 137 insertions(+), 151 deletions(-) diff --git a/frappy_psi/sea.py b/frappy_psi/sea.py index 6d4cf4c..be259ba 100644 --- a/frappy_psi/sea.py +++ b/frappy_psi/sea.py @@ -39,12 +39,13 @@ from os.path import expanduser, join, exists from frappy.client import ProxyClient from frappy.datatypes import ArrayOf, BoolType, \ EnumType, FloatRange, IntRange, StringType -from frappy.errors import ConfigError, HardwareError, secop_error, CommunicationFailedError +from frappy.core import IDLE, BUSY, ERROR +from frappy.errors import ConfigError, HardwareError, CommunicationFailedError from frappy.lib import generalConfig, mkthread from frappy.lib.asynconn import AsynConn, ConnectionClosed -from frappy.modules import Attached, Command, Done, Drivable, \ +from frappy.modulebase import Done +from frappy.modules import Attached, Command, Drivable, \ Module, Parameter, Property, Readable, Writable -from frappy.protocol.dispatcher import make_update CFG_HEADER = """Node('%(config)s.sea.psi.ch', @@ -107,7 +108,6 @@ class SeaClient(ProxyClient, Module): service = Property("main/stick/addons", StringType(), default='') visibility = 'expert' default_json_file = {} - _connect_thread = None _instance = None _last_connect = 0 @@ -124,6 +124,8 @@ class SeaClient(ProxyClient, Module): self.shutdown = False self.path2param = {} self._write_lock = threading.Lock() + self._connect_thread = None + self._connected = False config = opts.get('config') if isinstance(config, dict): config = config['value'] @@ -135,14 +137,11 @@ class SeaClient(ProxyClient, Module): Module.__init__(self, name, log, opts, srv) def doPoll(self): - if not self.asynio and time.time() > self._last_connect + 10: - with self._write_lock: - # make sure no more connect thread is running - if self._connect_thread and self._connect_thread.isAlive(): - return - if not self._last_connect: - self.log.info('reconnect to SEA %s', self.service) - self._connect_thread = mkthread(self._connect, None) + if not self._connected and time.time() > self._last_connect + 10: + if not self._last_connect: + self.log.info('reconnect to SEA %s', self.service) + if self._connect_thread is None: + self._connect_thread = mkthread(self._connect) def register_obj(self, module, obj): self.objects.add(obj) @@ -150,99 +149,105 @@ class SeaClient(ProxyClient, Module): self.path2param.setdefault(k, []).extend(v) self.register_callback(module.name, module.updateEvent) - def _connect(self, started_callback): - self.asynio = None - if self.syncio: - # trigger syncio reconnect in self.request() - try: - self.syncio.disconnect() - except Exception: - pass - self.syncio = None - self._last_connect = time.time() - if self._instance: - try: - from servicemanager import SeaManager # pylint: disable=import-outside-toplevel - SeaManager().do_start(self._instance) - except ImportError: - pass - if '//' not in self.uri: - self.uri = 'tcp://' + self.uri - self.asynio = AsynConn(self.uri) - reply = self.asynio.readline() - if reply != b'OK': - raise CommunicationFailedError('reply %r should be "OK"' % reply) - for _ in range(2): - self.asynio.writeline(b'Spy 007') - reply = self.asynio.readline() - if reply == b'Login OK': - break - else: - raise CommunicationFailedError('reply %r should be "Login OK"' % reply) - result = self.request('frappy_config %s %s' % (self.service, self.config)) - if result.startswith('ERROR:'): - raise CommunicationFailedError(f'reply from frappy_config: {result}') - # frappy_async_client switches to the json protocol (better for updates) - self.asynio.writeline(b'frappy_async_client') - self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode()) - self._connect_thread = None - mkthread(self._rxthread, started_callback) - - def request(self, command, quiet=False): - """send a request and wait for reply""" - with self._write_lock: - if not self.syncio or not self.syncio.connection: - if not self.asynio or not self.asynio.connection: - try: - self._connect_thread.join() - except AttributeError: - pass - # let doPoll do the reconnect - self.pollInfo.trigger(True) - raise ConnectionClosed('disconnected - reconnect later') - self.syncio = AsynConn(self.uri) - assert self.syncio.readline() == b'OK' - self.syncio.writeline(b'seauser seaser') - assert self.syncio.readline() == b'Login OK' - self.log.info('connected to %s', self.uri) - try: - self.syncio.flush_recv() - ft = 'fulltransAct' if quiet else 'fulltransact' - self.syncio.writeline(('%s %s' % (ft, command)).encode()) - result = None - deadline = time.time() + 10 - while time.time() < deadline: - reply = self.syncio.readline() - if reply is None: - continue - reply = reply.decode() - if reply.startswith('TRANSACTIONSTART'): - result = [] - continue - if reply == 'TRANSACTIONFINISHED': - if result is None: - self.log.info('missing TRANSACTIONSTART on: %s', command) - return '' - if not result: - return '' - return '\n'.join(result) - if result is None: - self.log.info('swallow: %s', reply) - continue - if not result: - result = [reply.split('=', 1)[-1]] - else: - result.append(reply) - except ConnectionClosed: + def _connect(self): + try: + if self.syncio: try: self.syncio.disconnect() except Exception: pass - self.syncio = None - raise - raise TimeoutError('no response within 10s') + self._last_connect = time.time() + if self._instance: + try: + from servicemanager import SeaManager # pylint: disable=import-outside-toplevel + SeaManager().do_start(self._instance) + except ImportError: + pass + if '//' not in self.uri: + self.uri = 'tcp://' + self.uri + self.asynio = AsynConn(self.uri) + reply = self.asynio.readline() + if reply != b'OK': + raise CommunicationFailedError('reply %r should be "OK"' % reply) + for _ in range(2): + self.asynio.writeline(b'Spy 007') + reply = self.asynio.readline() + if reply == b'Login OK': + break + else: + raise CommunicationFailedError('reply %r should be "Login OK"' % reply) + self.syncio = AsynConn(self.uri) + assert self.syncio.readline() == b'OK' + self.syncio.writeline(b'seauser seaser') + assert self.syncio.readline() == b'Login OK' + self.log.info('connected to %s', self.uri) - def _rxthread(self, started_callback): + result = self.raw_request('frappy_config %s %s' % (self.service, self.config)) + if result.startswith('ERROR:'): + raise CommunicationFailedError(f'reply from frappy_config: {result}') + # frappy_async_client switches to the json protocol (better for updates) + self.asynio.writeline(b'frappy_async_client') + self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode()) + self._connected = True + mkthread(self._rxthread) + finally: + self._connect_thread = None + + def request(self, command, quiet=False): + with self._write_lock: + if not self._connected: + if self._connect_thread is None: + # let doPoll do the reconnect + self.pollInfo.trigger(True) + raise ConnectionClosed('disconnected - reconnect is tried later') + return self.raw_request(command, quiet) + + def raw_request(self, command, quiet=False): + """send a request and wait for reply""" + try: + self.syncio.flush_recv() + ft = 'fulltransAct' if quiet else 'fulltransact' + self.syncio.writeline(('%s %s' % (ft, command)).encode()) + result = None + deadline = time.time() + 10 + while time.time() < deadline: + reply = self.syncio.readline() + if reply is None: + continue + reply = reply.decode() + if reply.startswith('TRANSACTIONSTART'): + result = [] + continue + if reply == 'TRANSACTIONFINISHED': + if result is None: + self.log.info('missing TRANSACTIONSTART on: %s', command) + return '' + if not result: + return '' + return '\n'.join(result) + if result is None: + self.log.info('swallow: %s', reply) + continue + if not result: + result = [reply.split('=', 1)[-1]] + else: + result.append(reply) + raise TimeoutError('no response within 10s') + except ConnectionClosed: + self.close_connections() + raise + + def close_connections(self): + connections = self.syncio, self.asynio + self._connected = False + self.syncio = self.asynio = None + for conn in connections: + try: + conn.disconnect() + except Exception: + pass + + def _rxthread(self): recheck = None while not self.shutdown: if recheck and time.time() > recheck: @@ -258,11 +263,7 @@ class SeaClient(ProxyClient, Module): if reply is None: continue except ConnectionClosed: - try: - self.asynio.disconnect() - except Exception: - pass - self.asynio = None + self.close_connections() break try: msg = json.loads(reply) @@ -289,9 +290,6 @@ class SeaClient(ProxyClient, Module): data = msg['data'] if flag == 'finish' and obj == 'get_all_param': # first updates have finished - if started_callback: - started_callback() - started_callback = None continue if flag != 'hdbevent': if obj not in ('frappy_async_client', 'get_all_param'): @@ -352,7 +350,7 @@ class SeaClient(ProxyClient, Module): class SeaConfigCreator(SeaClient): def startModule(self, start_events): """save objects (and sub-objects) description and exit""" - self._connect(None) + self._connect() reply = self.request('describe_all') reply = ''.join('' if line.startswith('WARNING') else line for line in reply.split('\n')) description, reply = json.loads(reply) @@ -644,22 +642,7 @@ class SeaModule(Module): if upd: upd(value, timestamp, readerror) return - try: - pobj = self.parameters[parameter] - except KeyError: - self.log.error('do not know %s:%s', self.name, parameter) - raise - pobj.timestamp = timestamp - # should be done here: deal with clock differences - if not readerror: - try: - pobj.value = value # store the value even in case of a validation error - pobj.value = pobj.datatype(value) - except Exception as e: - readerror = secop_error(e) - pobj.readerror = readerror - if pobj.export: - self.secNode.srv.dispatcher.broadcast_event(make_update(self.name, pobj)) + self.announceUpdate(parameter, value, readerror, timestamp) def initModule(self): self.io.register_obj(self, self.sea_object) @@ -670,20 +653,35 @@ class SeaModule(Module): class SeaReadable(SeaModule, Readable): + _readerror = None + _status = IDLE, '' + + def update_value(self, value, timestamp, readerror): + # make sure status is always ERROR when reading value fails + self._readerror = readerror + if readerror: + self.read_status() # forced ERROR status + self.announceUpdate('value', value, readerror, timestamp) + else: # order is important + self.value = value # includes announceUpdate + self.read_status() # send event for ordinary self._status def update_status(self, value, timestamp, readerror): if readerror: - value = repr(readerror) + value = f'{readerror.name} - {readerror}' if value == '': - self.status = (self.Status.IDLE, '') + self._status = IDLE, '' else: - self.status = (self.Status.ERROR, value) + self._status = ERROR, value + self.read_status() def read_status(self): - return self.status + if self._readerror: + return ERROR, f'{self._readerror.name} - {self._readerror}' + return self._status -class SeaWritable(SeaModule, Writable): +class SeaWritable(SeaReadable, Writable): def read_value(self): return self.target @@ -693,20 +691,13 @@ class SeaWritable(SeaModule, Writable): self.value = value -class SeaDrivable(SeaModule, Drivable): - _sea_status = '' +class SeaDrivable(SeaReadable, Drivable): _is_running = 0 def earlyInit(self): super().earlyInit() self._run_event = threading.Event() - def read_status(self): - return self.status - - # def read_target(self): - # return self.target - def write_target(self, value): self._run_event.clear() self.io.query(f'run {self.sea_object} {value}') @@ -714,25 +705,20 @@ class SeaDrivable(SeaModule, Drivable): self.log.warn('target changed but is_running stays 0') return value - def update_status(self, value, timestamp, readerror): - if not readerror: - self._sea_status = value - self.updateStatus() - def update_is_running(self, value, timestamp, readerror): if not readerror: self._is_running = value - self.updateStatus() + self.read_status() if value: self._run_event.set() - def updateStatus(self): - if self._sea_status: - self.status = (self.Status.ERROR, self._sea_status) - elif self._is_running: - self.status = (self.Status.BUSY, 'driving') - else: - self.status = (self.Status.IDLE, '') + def read_status(self): + status = super().read_status() + if self._is_running: + if status[0] >= ERROR: + return ERROR, 'BUSY + ' + status[1] + return BUSY, 'driving' + return status def updateTarget(self, module, parameter, value, timestamp, readerror): if value is not None: