From 2561e820860053c0c275948cebc0ab28c4ef9549 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Fri, 5 Apr 2024 10:56:05 +0200 Subject: [PATCH] frappy.client: improve error handling more - implement a 'handleError' callback - SecopClient.handleError is registered as a callback and by default logs errors up to a limited number of times - the cache is customized to return an item indicating an undefined value, helping to avoid follow up errors. + frappy.errors: cosmetic fix + frappy.client: cosmetic changes Change-Id: I4614e1d27c7aea3a1a722176c6be55f8563597cd Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33429 Tested-by: Jenkins Automated Tests Reviewed-by: Enrico Faulhaber Reviewed-by: Markus Zolliker --- frappy/client/__init__.py | 180 ++++++++++++++++++++++---------------- frappy/errors.py | 2 +- 2 files changed, 107 insertions(+), 75 deletions(-) diff --git a/frappy/client/__init__.py b/frappy/client/__init__.py index c828016..bacb7c7 100644 --- a/frappy/client/__init__.py +++ b/frappy/client/__init__.py @@ -29,10 +29,10 @@ import time from collections import defaultdict from threading import Event, RLock, current_thread -import frappy.errors import frappy.params +from frappy.errors import make_secop_error, SECoPError, WrongTypeError, ProtocolError, ProgrammingError from frappy.datatypes import get_datatype -from frappy.lib import mkthread, formatExtendedStack +from frappy.lib import mkthread from frappy.lib.asynconn import AsynConn, ConnectionClosed from frappy.protocol.interface import decode_msg, encode_msg_frame from frappy.protocol.messages import COMMANDREQUEST, \ @@ -43,7 +43,7 @@ from frappy.protocol.messages import COMMANDREQUEST, \ # replies to be handled for cache UPDATE_MESSAGES = {EVENTREPLY, READREPLY, WRITEREPLY, ERRORPREFIX + READREQUEST, ERRORPREFIX + EVENTREPLY} -VERSIONFMT= re.compile(r'^[^,]*?ISSE[^,]*,SECoP,') +VERSIONFMT = re.compile(r'^[^,]*?ISSE[^,]*,SECoP,') class UnregisterCallback(Exception): @@ -83,6 +83,12 @@ class CallbackObject: def unhandledMessage(self, action, ident, data): """called on an unhandled message""" + def handleError(self, exc): + """called on errors handling messages + + :param exc: the exception raised (= sys.exception()) + """ + def nodeStateChange(self, online, state): """called when the state of the connection changes @@ -105,19 +111,13 @@ class CacheItem(tuple): inheriting from tuple: compatible with old previous version of cache """ def __new__(cls, value, timestamp=None, readerror=None, datatype=None): - if readerror: - assert isinstance(readerror, Exception) - else: - try: - value = datatype.import_value(value) - except (KeyError, ValueError, AttributeError): - readerror = ValueError(f'can not import {value!r} as {datatype!r}') - value = None obj = tuple.__new__(cls, (value, timestamp, readerror)) - try: - obj.format_value = datatype.format_value - except AttributeError: - obj.format_value = lambda value, unit=None: str(value) + if datatype: + try: + # override default method + obj.format_value = datatype.format_value + except AttributeError: + pass return obj @property @@ -144,6 +144,11 @@ class CacheItem(tuple): return repr(self[2]) return self.format_value(self[0]) + @staticmethod + def format_value(value, unit=None): + """typically overridden with datatype.format_value""" + return str(value) + def __repr__(self): args = (self.value,) if self.timestamp: @@ -153,11 +158,22 @@ class CacheItem(tuple): return f'CacheItem{repr(args)}' +class Cache(dict): + class Undefined(Exception): + def __repr__(self): + return '' + + undefined = CacheItem(None, None, Undefined()) + + def __missing__(self, key): + return self.undefined + + class ProxyClient: """common functionality for proxy clients""" CALLBACK_NAMES = {'updateEvent', 'updateItem', 'descriptiveDataChange', - 'nodeStateChange', 'unhandledMessage'} + 'nodeStateChange', 'unhandledMessage', 'handleError'} online = False # connected or reconnecting since a short time state = 'disconnected' # further possible values: 'connecting', 'reconnecting', 'connected' log = None @@ -165,7 +181,7 @@ class ProxyClient: def __init__(self): self.callbacks = {cbname: defaultdict(list) for cbname in self.CALLBACK_NAMES} # caches (module, parameter) = value, timestamp, readerror (internal names!) - self.cache = {} + self.cache = Cache() # dict returning Cache.undefined for missing keys def register_callback(self, key, *args, **kwds): """register callback functions @@ -244,16 +260,18 @@ class ProxyClient: except UnregisterCallback: cblist.remove(cbfunc) except Exception as e: - # the programmer should catch all errors in callbacks - # if not, the log will be flooded with errors - if self.log: - self.log.exception('error %r calling %s%r', e, cbfunc.__name__, args) + if cbname != 'handleError': + try: + e.args = [f'error in callback {cbname}{args}: {e}'] + self.callback(None, 'handleError', e) + except Exception: + pass return bool(cblist) def updateValue(self, module, param, value, timestamp, readerror): self.callback(None, 'updateEvent', module, param, value, timestamp, readerror) self.callback(module, 'updateEvent', module, param, value, timestamp, readerror) - self.callback((module, param), 'updateEvent', module, param,value, timestamp, readerror) + self.callback((module, param), 'updateEvent', module, param, value, timestamp, readerror) class SecopClient(ProxyClient): @@ -268,6 +286,8 @@ class SecopClient(ProxyClient): descriptive_data = {} modules = {} _last_error = None + _update_error_count = 0 + _max_error_count = 10 def __init__(self, uri, log=Logger): super().__init__() @@ -283,6 +303,7 @@ class SecopClient(ProxyClient): self._lock = RLock() self._shutdown = Event() self.cleanup = [] + self.register_callback(None, self.handleError) def __del__(self): try: @@ -298,6 +319,7 @@ class SecopClient(ProxyClient): with self._lock: if self.io: return + self._shutdown.clear() self.txq = queue.Queue(30) self.pending = queue.Queue(30) self.active_requests.clear() @@ -389,43 +411,37 @@ class SecopClient(ProxyClient): continue self.log.debug('RX: %r', reply) noactivity = 0 - action, ident, data = decode_msg(reply) - if ident == '.': - ident = None - if action in UPDATE_MESSAGES: - module_param = self.internal.get(ident, None) - if module_param is None and ':' not in (ident or ''): - # allow missing ':value'/':target' - if action == WRITEREPLY: - module_param = self.internal.get(f'{ident}:target', None) - else: - module_param = self.internal.get(f'{ident}:value', None) - if module_param is not None: - now = time.time() - if action.startswith(ERRORPREFIX): - timestamp = data[2].get('t', now) - readerror = frappy.errors.make_secop_error(*data[0:2]) - value = None - else: - timestamp = data[1].get('t', now) - value = data[0] - readerror = None - module, param = module_param - timestamp = min(now, timestamp) # no timestamps in the future! - try: + try: + action, ident, data = decode_msg(reply) + if ident == '.': + ident = None + if action in UPDATE_MESSAGES: + module_param = self.internal.get(ident, None) + if module_param is None and ':' not in (ident or ''): + # allow missing ':value'/':target' + if action == WRITEREPLY: + module_param = self.internal.get(f'{ident}:target', None) + else: + module_param = self.internal.get(f'{ident}:value', None) + if module_param is not None: + now = time.time() + if action.startswith(ERRORPREFIX): + timestamp = data[2].get('t', now) + readerror = make_secop_error(*data[0:2]) + value = None + else: + timestamp = data[1].get('t', now) + value = data[0] + readerror = None + module, param = module_param + timestamp = min(now, timestamp) # no timestamps in the future! self.updateValue(module, param, value, timestamp, readerror) - except KeyError: - pass # ignore updates of unknown parameters - except Exception as e: - self.log.debug(f'error when updating %s:%s %r', module, param, value) - try: - # catch errors in callback functions - self.updateValue(module, param, None, timestamp, - type(e)(f'{e} - raised on client side')) - except Exception as ee: - self.log.warn(f'can not handle error update %r for %s:%s: %r', e, module, param, ee) - if action in (EVENTREPLY, ERRORPREFIX + EVENTREPLY): - continue + if action in (EVENTREPLY, ERRORPREFIX + EVENTREPLY): + continue + except Exception as e: + e.args = (f'error handling SECoP message {reply!r}: {e}',) + self.callback(None, 'handleError', e) + continue try: key = action, ident entry = self.active_requests.pop(key) @@ -452,17 +468,20 @@ class SecopClient(ProxyClient): except ConnectionClosed: pass except Exception as e: - self.log.error('rxthread ended with %r', e) - self._rxthread = None - self.disconnect(False) - if self._shutdown.is_set(): - return - if self.activate: - self.log.info('try to reconnect to %s', self.uri) - self._connthread = mkthread(self._reconnect) - else: - self.log.warning('%s disconnected', self.uri) - self._set_state(False, 'disconnected') + self._shutdown.set() + self.callback(None, 'handleError', e) + finally: + self._rxthread = None + if self._shutdown.is_set(): + self.disconnect(True) + return + self.disconnect(False) + if self.activate: + self.log.info('try to reconnect to %s', self.uri) + self._connthread = mkthread(self._reconnect) + else: + self.log.warning('%s disconnected', self.uri) + self._set_state(False, 'disconnected') def spawn_connect(self, connected_callback=None): """try to connect in background @@ -590,6 +609,13 @@ class SecopClient(ProxyClient): if not self.callback(None, 'unhandledMessage', action, ident, data): self.log.warning('unhandled message: %s %s %r', action, ident, data) + def handleError(self, exc): + if self._update_error_count < self._max_error_count: + self.log.exception('%s', exc) + self._update_error_count += 1 + if self._update_error_count == self._max_error_count: + self.log.error('disabled reporting of further update errors') + def _set_state(self, online, state=None): # remark: reconnecting is treated as online self.online = online @@ -613,11 +639,13 @@ class SecopClient(ProxyClient): self.cleanup.append(entry) raise TimeoutError('no response within 10s') if not entry[2]: # reply + if self._shutdown.is_set(): + raise ConnectionError('connection shut down') # no cleanup needed as self.active_requests will be cleared on connect raise ConnectionError('connection closed before reply') action, _, data = entry[2] # pylint: disable=unpacking-non-sequence if action.startswith(ERRORPREFIX): - raise frappy.errors.make_secop_error(*data[0:2]) + raise make_secop_error(*data[0:2]) return entry[2] # reply def request(self, action, ident=None, data=None): @@ -632,7 +660,7 @@ class SecopClient(ProxyClient): """forced read over connection""" try: self.request(READREQUEST, self.identifier[module, parameter]) - except frappy.errors.SECoPError: + except SECoPError: # error reply message is already stored as readerror in cache pass return self.cache.get((module, parameter), None) @@ -660,7 +688,7 @@ class SecopClient(ProxyClient): argument = datatype.export_value(argument) else: if argument is not None: - raise frappy.errors.WrongTypeError('command has no argument') + raise WrongTypeError('command has no argument') # pylint: disable=unsubscriptable-object data, qualifiers = self.request(COMMANDREQUEST, self.identifier[module, command], argument)[2] datatype = self.modules[module]['commands'][command]['datatype'].result @@ -669,8 +697,12 @@ class SecopClient(ProxyClient): return data, qualifiers def updateValue(self, module, param, value, timestamp, readerror): - entry = CacheItem(value, timestamp, readerror, - self.modules[module]['parameters'][param]['datatype']) + datatype = self.modules[module]['parameters'][param]['datatype'] + if readerror: + assert isinstance(readerror, Exception) + else: + value = datatype.import_value(value) + entry = CacheItem(value, timestamp, readerror, datatype) self.cache[(module, param)] = entry self.callback(None, 'updateItem', module, param, entry) self.callback(module, 'updateItem', module, param, entry) diff --git a/frappy/errors.py b/frappy/errors.py index fcd6eb1..801ed5f 100644 --- a/frappy/errors.py +++ b/frappy/errors.py @@ -218,7 +218,7 @@ class IsErrorError(SECoPError): class DisabledError(SECoPError): """The requested action can not be performed while the module is disabled""" - name = 'disabled' + name = 'Disabled' class ImpossibleError(SECoPError):