frappy.client: improve error handling more

- implement a 'handleError' callback
- SecopClient.handleError is registered as a callback and by default
  logs errors up to a limited number of times
- the cache is customized to return an item indicating an undefined value,
  helping to avoid follow up errors.
+ frappy.errors: cosmetic fix
+ frappy.client: cosmetic changes

Change-Id: I4614e1d27c7aea3a1a722176c6be55f8563597cd
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33429
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2024-04-05 10:56:05 +02:00
parent 55c96ffe4f
commit 2561e82086
2 changed files with 107 additions and 75 deletions

View File

@ -29,10 +29,10 @@ import time
from collections import defaultdict
from threading import Event, RLock, current_thread
import frappy.errors
import frappy.params
from frappy.errors import make_secop_error, SECoPError, WrongTypeError, ProtocolError, ProgrammingError
from frappy.datatypes import get_datatype
from frappy.lib import mkthread, formatExtendedStack
from frappy.lib import mkthread
from frappy.lib.asynconn import AsynConn, ConnectionClosed
from frappy.protocol.interface import decode_msg, encode_msg_frame
from frappy.protocol.messages import COMMANDREQUEST, \
@ -43,7 +43,7 @@ from frappy.protocol.messages import COMMANDREQUEST, \
# replies to be handled for cache
UPDATE_MESSAGES = {EVENTREPLY, READREPLY, WRITEREPLY, ERRORPREFIX + READREQUEST, ERRORPREFIX + EVENTREPLY}
VERSIONFMT= re.compile(r'^[^,]*?ISSE[^,]*,SECoP,')
VERSIONFMT = re.compile(r'^[^,]*?ISSE[^,]*,SECoP,')
class UnregisterCallback(Exception):
@ -83,6 +83,12 @@ class CallbackObject:
def unhandledMessage(self, action, ident, data):
"""called on an unhandled message"""
def handleError(self, exc):
"""called on errors handling messages
:param exc: the exception raised (= sys.exception())
"""
def nodeStateChange(self, online, state):
"""called when the state of the connection changes
@ -105,19 +111,13 @@ class CacheItem(tuple):
inheriting from tuple: compatible with old previous version of cache
"""
def __new__(cls, value, timestamp=None, readerror=None, datatype=None):
if readerror:
assert isinstance(readerror, Exception)
else:
try:
value = datatype.import_value(value)
except (KeyError, ValueError, AttributeError):
readerror = ValueError(f'can not import {value!r} as {datatype!r}')
value = None
obj = tuple.__new__(cls, (value, timestamp, readerror))
try:
obj.format_value = datatype.format_value
except AttributeError:
obj.format_value = lambda value, unit=None: str(value)
if datatype:
try:
# override default method
obj.format_value = datatype.format_value
except AttributeError:
pass
return obj
@property
@ -144,6 +144,11 @@ class CacheItem(tuple):
return repr(self[2])
return self.format_value(self[0])
@staticmethod
def format_value(value, unit=None):
"""typically overridden with datatype.format_value"""
return str(value)
def __repr__(self):
args = (self.value,)
if self.timestamp:
@ -153,11 +158,22 @@ class CacheItem(tuple):
return f'CacheItem{repr(args)}'
class Cache(dict):
class Undefined(Exception):
def __repr__(self):
return '<undefined>'
undefined = CacheItem(None, None, Undefined())
def __missing__(self, key):
return self.undefined
class ProxyClient:
"""common functionality for proxy clients"""
CALLBACK_NAMES = {'updateEvent', 'updateItem', 'descriptiveDataChange',
'nodeStateChange', 'unhandledMessage'}
'nodeStateChange', 'unhandledMessage', 'handleError'}
online = False # connected or reconnecting since a short time
state = 'disconnected' # further possible values: 'connecting', 'reconnecting', 'connected'
log = None
@ -165,7 +181,7 @@ class ProxyClient:
def __init__(self):
self.callbacks = {cbname: defaultdict(list) for cbname in self.CALLBACK_NAMES}
# caches (module, parameter) = value, timestamp, readerror (internal names!)
self.cache = {}
self.cache = Cache() # dict returning Cache.undefined for missing keys
def register_callback(self, key, *args, **kwds):
"""register callback functions
@ -244,16 +260,18 @@ class ProxyClient:
except UnregisterCallback:
cblist.remove(cbfunc)
except Exception as e:
# the programmer should catch all errors in callbacks
# if not, the log will be flooded with errors
if self.log:
self.log.exception('error %r calling %s%r', e, cbfunc.__name__, args)
if cbname != 'handleError':
try:
e.args = [f'error in callback {cbname}{args}: {e}']
self.callback(None, 'handleError', e)
except Exception:
pass
return bool(cblist)
def updateValue(self, module, param, value, timestamp, readerror):
self.callback(None, 'updateEvent', module, param, value, timestamp, readerror)
self.callback(module, 'updateEvent', module, param, value, timestamp, readerror)
self.callback((module, param), 'updateEvent', module, param,value, timestamp, readerror)
self.callback((module, param), 'updateEvent', module, param, value, timestamp, readerror)
class SecopClient(ProxyClient):
@ -268,6 +286,8 @@ class SecopClient(ProxyClient):
descriptive_data = {}
modules = {}
_last_error = None
_update_error_count = 0
_max_error_count = 10
def __init__(self, uri, log=Logger):
super().__init__()
@ -283,6 +303,7 @@ class SecopClient(ProxyClient):
self._lock = RLock()
self._shutdown = Event()
self.cleanup = []
self.register_callback(None, self.handleError)
def __del__(self):
try:
@ -298,6 +319,7 @@ class SecopClient(ProxyClient):
with self._lock:
if self.io:
return
self._shutdown.clear()
self.txq = queue.Queue(30)
self.pending = queue.Queue(30)
self.active_requests.clear()
@ -389,43 +411,37 @@ class SecopClient(ProxyClient):
continue
self.log.debug('RX: %r', reply)
noactivity = 0
action, ident, data = decode_msg(reply)
if ident == '.':
ident = None
if action in UPDATE_MESSAGES:
module_param = self.internal.get(ident, None)
if module_param is None and ':' not in (ident or ''):
# allow missing ':value'/':target'
if action == WRITEREPLY:
module_param = self.internal.get(f'{ident}:target', None)
else:
module_param = self.internal.get(f'{ident}:value', None)
if module_param is not None:
now = time.time()
if action.startswith(ERRORPREFIX):
timestamp = data[2].get('t', now)
readerror = frappy.errors.make_secop_error(*data[0:2])
value = None
else:
timestamp = data[1].get('t', now)
value = data[0]
readerror = None
module, param = module_param
timestamp = min(now, timestamp) # no timestamps in the future!
try:
try:
action, ident, data = decode_msg(reply)
if ident == '.':
ident = None
if action in UPDATE_MESSAGES:
module_param = self.internal.get(ident, None)
if module_param is None and ':' not in (ident or ''):
# allow missing ':value'/':target'
if action == WRITEREPLY:
module_param = self.internal.get(f'{ident}:target', None)
else:
module_param = self.internal.get(f'{ident}:value', None)
if module_param is not None:
now = time.time()
if action.startswith(ERRORPREFIX):
timestamp = data[2].get('t', now)
readerror = make_secop_error(*data[0:2])
value = None
else:
timestamp = data[1].get('t', now)
value = data[0]
readerror = None
module, param = module_param
timestamp = min(now, timestamp) # no timestamps in the future!
self.updateValue(module, param, value, timestamp, readerror)
except KeyError:
pass # ignore updates of unknown parameters
except Exception as e:
self.log.debug(f'error when updating %s:%s %r', module, param, value)
try:
# catch errors in callback functions
self.updateValue(module, param, None, timestamp,
type(e)(f'{e} - raised on client side'))
except Exception as ee:
self.log.warn(f'can not handle error update %r for %s:%s: %r', e, module, param, ee)
if action in (EVENTREPLY, ERRORPREFIX + EVENTREPLY):
continue
if action in (EVENTREPLY, ERRORPREFIX + EVENTREPLY):
continue
except Exception as e:
e.args = (f'error handling SECoP message {reply!r}: {e}',)
self.callback(None, 'handleError', e)
continue
try:
key = action, ident
entry = self.active_requests.pop(key)
@ -452,17 +468,20 @@ class SecopClient(ProxyClient):
except ConnectionClosed:
pass
except Exception as e:
self.log.error('rxthread ended with %r', e)
self._rxthread = None
self.disconnect(False)
if self._shutdown.is_set():
return
if self.activate:
self.log.info('try to reconnect to %s', self.uri)
self._connthread = mkthread(self._reconnect)
else:
self.log.warning('%s disconnected', self.uri)
self._set_state(False, 'disconnected')
self._shutdown.set()
self.callback(None, 'handleError', e)
finally:
self._rxthread = None
if self._shutdown.is_set():
self.disconnect(True)
return
self.disconnect(False)
if self.activate:
self.log.info('try to reconnect to %s', self.uri)
self._connthread = mkthread(self._reconnect)
else:
self.log.warning('%s disconnected', self.uri)
self._set_state(False, 'disconnected')
def spawn_connect(self, connected_callback=None):
"""try to connect in background
@ -590,6 +609,13 @@ class SecopClient(ProxyClient):
if not self.callback(None, 'unhandledMessage', action, ident, data):
self.log.warning('unhandled message: %s %s %r', action, ident, data)
def handleError(self, exc):
if self._update_error_count < self._max_error_count:
self.log.exception('%s', exc)
self._update_error_count += 1
if self._update_error_count == self._max_error_count:
self.log.error('disabled reporting of further update errors')
def _set_state(self, online, state=None):
# remark: reconnecting is treated as online
self.online = online
@ -613,11 +639,13 @@ class SecopClient(ProxyClient):
self.cleanup.append(entry)
raise TimeoutError('no response within 10s')
if not entry[2]: # reply
if self._shutdown.is_set():
raise ConnectionError('connection shut down')
# no cleanup needed as self.active_requests will be cleared on connect
raise ConnectionError('connection closed before reply')
action, _, data = entry[2] # pylint: disable=unpacking-non-sequence
if action.startswith(ERRORPREFIX):
raise frappy.errors.make_secop_error(*data[0:2])
raise make_secop_error(*data[0:2])
return entry[2] # reply
def request(self, action, ident=None, data=None):
@ -632,7 +660,7 @@ class SecopClient(ProxyClient):
"""forced read over connection"""
try:
self.request(READREQUEST, self.identifier[module, parameter])
except frappy.errors.SECoPError:
except SECoPError:
# error reply message is already stored as readerror in cache
pass
return self.cache.get((module, parameter), None)
@ -660,7 +688,7 @@ class SecopClient(ProxyClient):
argument = datatype.export_value(argument)
else:
if argument is not None:
raise frappy.errors.WrongTypeError('command has no argument')
raise WrongTypeError('command has no argument')
# pylint: disable=unsubscriptable-object
data, qualifiers = self.request(COMMANDREQUEST, self.identifier[module, command], argument)[2]
datatype = self.modules[module]['commands'][command]['datatype'].result
@ -669,8 +697,12 @@ class SecopClient(ProxyClient):
return data, qualifiers
def updateValue(self, module, param, value, timestamp, readerror):
entry = CacheItem(value, timestamp, readerror,
self.modules[module]['parameters'][param]['datatype'])
datatype = self.modules[module]['parameters'][param]['datatype']
if readerror:
assert isinstance(readerror, Exception)
else:
value = datatype.import_value(value)
entry = CacheItem(value, timestamp, readerror, datatype)
self.cache[(module, param)] = entry
self.callback(None, 'updateItem', module, param, entry)
self.callback(module, 'updateItem', module, param, entry)

View File

@ -218,7 +218,7 @@ class IsErrorError(SECoPError):
class DisabledError(SECoPError):
"""The requested action can not be performed while the module is disabled"""
name = 'disabled'
name = 'Disabled'
class ImpossibleError(SECoPError):