diff --git a/secop_psi/historywriter.py b/secop/historywriter.py similarity index 52% rename from secop_psi/historywriter.py rename to secop/historywriter.py index 7f21c31..5fcae4c 100644 --- a/secop_psi/historywriter.py +++ b/secop/historywriter.py @@ -19,10 +19,8 @@ # ***************************************************************************** import time -try: - import frappyhistory # pylint: disable=import-error -except ImportError: - pass # do not complain when used for tests +from os.path import join +from frappyhistory.writer import Writer # pylint: disable=import-error from secop.lib import clamp, formatExtendedTraceback from secop.datatypes import IntRange, FloatRange, ScaledInteger,\ EnumType, BoolType, StringType, TupleOf, StructOf, ArrayOf, TextType @@ -41,13 +39,15 @@ def make_cvt_list(dt, tail): if isinstance(dt, (FloatRange, ScaledInteger)): opts = {'key': tail} if dt.unit: - opts['group'] = dt.unit + opts['unit'] = dt.unit opts['stepped'] = True return [(dt.import_value, opts)] if isinstance(dt, StringType): opts = {'key': tail, 'kind': 'STR'} if isinstance(dt, TextType): opts['category'] = 'no' + else: + opts['category'] = 'string' return [(lambda x: x, opts)] if isinstance(dt, TupleOf): result = [] @@ -93,7 +93,8 @@ class FrappyAbstractHistoryWriter: :param opts: a dict containing some of the following options - label: a label for the curve in the chart - - group: grouping of the curves in charts (often equal to unit) + - unit: the physical unit + - group: grouping of the curves in charts (unit by default) - stepped: lines in charts should be drawn as stepped line. Only applicable when kind='NUM' True by default. """ @@ -108,14 +109,6 @@ class FrappyAbstractHistoryWriter: """ raise NotImplementedError() - def get(self, key): - """get from cache - - :param key: the curve name - :returns: the last stored value or None - """ - raise NotImplementedError() - def close(self, timestamp): """close the writer @@ -125,32 +118,25 @@ class FrappyAbstractHistoryWriter: raise NotImplementedError() -class FrappyHistoryHandler: - def __init__(self, writer, modules, dispatcher, logger=None): - self.writer = writer - self.log = logger - self.cvt_lists = {} # dict of - self.dispatcher = dispatcher - self._init_time = time.time() - self._last_time = self._init_time +class FrappyHistory(Writer): + def __init__(self, history_path, modules, logger): + minsteps = {} # generalConfig.mintimesteps + super().__init__(history_path, logger, minsteps=minsteps) + self.__init_time = time.time() + self.__last_time = self.__init_time + # bad_timestep = set() for modname, modobj in modules.items(): + # if isinstance(modobj, HasComlog): + # modobj.enableComlog(join(history_path, 'comlog')) for pname, pobj in modobj.parameters.items(): - ident = '%s:%s' % (modname, pobj.export) key = '%s:%s' % (modname, pname) dt = pobj.datatype cvt_list = make_cvt_list(dt, key) - # create default opts - if pobj.history_category: - values = pobj.history_category.split(',') - else: - values = [None] - if len(values) == 1: - values *= len(cvt_list) - for cat, (_, opts) in zip(values, cvt_list): - if cat is None: - cat = opts.get('category', 'major' if pname in ('value', 'target') else 'minor') - opts['category'] = cat + given_opts = pobj.history + if isinstance(given_opts, dict): + given_opts = [given_opts] * len(cvt_list) + if pname == 'value': for _, opts in cvt_list: opts['key'] = opts['key'].replace(':value', '') @@ -158,75 +144,72 @@ class FrappyHistoryHandler: # default labels ':status' and ':status_text' for lbl, (_, opts) in zip([key, key + '_text'], cvt_list): opts['label'] = lbl - # overwrite opts based on history_* properties - if pobj.history_label: - for lbl, (_, opts) in zip(','.split(pobj.history_label), cvt_list): - opts['label'] = lbl - if pobj.history_group: - values = pobj.history_group.split(',') - if len(values) == 1: - values *= len(cvt_list) - for grp, (_, opts) in zip(values, cvt_list): - opts['group'] = grp - if pobj.history_stepped: - values = pobj.history_stepped - elif pobj.readonly: - values = False - if not isinstance(values, tuple): - values = [values] * len(cvt_list) - for stp, (_, opts) in zip(values, cvt_list): - if not stp and 'stepped' in opts: # only on floats - opts['stepped'] = False - cvt_list = [(key, opts) for key, opts in cvt_list if opts.get('category') != 'no'] - for _, opts in cvt_list: - if opts.get('stepped'): - opts.pop('stepped', None) - writer.put_def(**opts) - if cvt_list: - self.cvt_lists[ident] = cvt_list + label_set = set() + cvt_filtered = [] + for given, (cvt_func, opts) in zip(given_opts, cvt_list): + result = dict(opts, **given) - self.dispatcher.handle_activate(self, None, None) - self._init_time = None + stepped = result.pop('stepped', None) + if opts.get('stepped'): # True on floats + if pobj.readonly or stepped is False: + result['stepped'] = False - def close_message(self, msg): - self.writer.close(time.time()) + cat = given.get('category') + if cat is None: + if not pobj.export: + continue + cat = result.get('category') + if cat is None: + if pname in ('value', 'target'): + result['category'] = 'major' + elif pobj.readonly: + result['category'] = 'minor' + else: + result['category'] = 'param' + if cat == 'no': + continue - def send_reply(self, msg): - try: - action, ident, value = msg - assert action.endswith('update') - cvt_list = self.cvt_lists.get(ident) - if not cvt_list: - return - if self._init_time: - t = self._init_time # on initialisation, use the same timestamp for all - else: - t = value[1].get('t') - if t: - # make sure time stamp is not decreasing, as a potentially decreasing - # value might bring the reader software into trouble - t = clamp(self._last_time, t, time.time()) - else: - t = time.time() - self._last_time = t - if action == 'update': - for fun, opts in cvt_list: - # we only look at the value, qualifiers are ignored for now - # we do not use the timestamp here, as a potentially decreasing value might - # bring the reader software into trouble - # print('UPDATE', key, value, t) - self.writer.put(t, opts['key'], fun(value[0])) - else: # error_update - for _, opts in cvt_list: - self.writer.put(t, opts['key'], None) - except Exception as e: - self.log.error('FrappyHistoryHandler.send_reply: %r with msg %r: ', repr(e), msg) - print(formatExtendedTraceback()) + label = result.pop('label', None) + if label and label not in label_set: + result['label'] = label + label_set.add(label) + cvt_filtered.append((cvt_func, result)) + # if result.get('timestep', 1) < minstep: + # bad_timestep.add('%s:%s' % (modname, pname)) + self.put_def(**result) -def add_writer(history_path, srv): - # treat handler as a connection - logger = srv.log.getChild('history') - srv.dispatcher.add_connection(FrappyHistoryHandler( - frappyhistory.Writer(history_path, logger), srv.modules, srv.dispatcher, logger)) + if cvt_filtered: + def callback(value, p=pobj, history=self, cvt=cvt_filtered): + if self.__init_time: + t = self.__init_time # on initialisation, use the same timestamp for all + else: + t = p.timestamp + if t: + # make sure time stamp is not decreasing, as a potentially decreasing + # value might bring the reader software into trouble + t = clamp(self.__last_time, t, time.time()) + else: + t = time.time() + history.__last_time = t + if pobj.readerror: # error update + for _, opts in cvt: + self.put(t, opts['key'], None) + else: + for fun, opts in cvt: + self.put(t, opts['key'], fun(value)) + + modobj.valueCallbacks[pname].append(callback) + modobj.errorCallbacks[pname].append(callback) + # if bad_timestep: + # if minstep < 1: + # logger.error('timestep < generalConfig.mintimestep') + # else: + # logger.error('timestep < 1, generalConfig.mintimestep not given?') + # logger.error('parameters: %s', ', '.join(bad_timestep)) + # + self.__init_time = None + + def close(self): + super().close(max(self.__last_time, time.time())) \ No newline at end of file diff --git a/secop/params.py b/secop/params.py index 519c63e..52da84f 100644 --- a/secop/params.py +++ b/secop/params.py @@ -26,7 +26,7 @@ import inspect from secop.datatypes import BoolType, CommandType, DataType, \ - DataTypeType, EnumType, IntRange, NoneOr, OrType, \ + DataTypeType, EnumType, IntRange, NoneOr, OrType, FloatRange, \ StringType, StructOf, TextType, TupleOf, ValueType, ArrayOf from secop.errors import BadValueError, ProgrammingError from secop.properties import HasProperties, Property @@ -94,6 +94,11 @@ class Accessible(HasProperties): return '%s(%s)' % (self.__class__.__name__, ', '.join(props)) +historyStruct = StructOf(category=StringType(), label=StringType(), group=StringType(), + stepped=OrType(BoolType(), StringType()), timestep=FloatRange(0, 1), + record_unchanged=BoolType()) + + class Parameter(Accessible): """defines a parameter @@ -163,52 +168,35 @@ class Parameter(Accessible): default None: write if given in config''', NoneOr(BoolType()), export=False, default=None, settable=False) - history_category = Property( - '''[custom] category for history - - major: should be shown by default in a history chart, default for value and target - minor: to be shown optionally in a history chart, default for other parameters - no: history is not saved. default for TextType and ArrayOf + history = Property( + '''[custom] options for history - category is ignored (forced to no) for BlobType + for structured types, this is an array of options, to be applied in the order + of the created elements. - For structured types, the category may be a comma separated list, overwriting the - default for the first or all curves. - If it does not contain a comma, it applies for all curves + list of options: + + category + - major: should be shown by default in a history chart, default for value and target + - minor: to be shown optionally in a history chart, default for other parameters + - no: history is not saved. default for TextType and ArrayOf + + category is ignored (forced to no) for BlobType + + label + default: : or for main value + + group: + default: unit + + stepped: + whether a curve has to be drawn stepped or connected. + default: True when readonly=False, else False + + timestep: + the desired time step for the curve storage. maximum and default value is 1 sec ''', - NoneOr(StringType()), export=True, default=None, settable=False) - history_label = Property( - '''[custom] label for history - - default: : or for main value - - For structured types, the label may be a comma separated list, overwriting the - default for the first or all curves. - If it does not contain a comma, it applies for the first curve only. - ''', - NoneOr(StringType()), export=True, default=None, settable=False) - history_group = Property( - '''[custom] group for history - - default: unit - - For structured types, the group may be a comma separated list, overwriting the - default for the first or all curves. If it does not contain a comma, it is - applies for all curves. - ''', - NoneOr(StringType()), export=True, default=None, settable=False) - history_stepped = Property( - '''[custom] stepped curve - - Whether a curve has to be drawn stepped or connected. - default: True when readonly=False, else False - - Applicable to FloatRange and ScaledInteger only, other types are stepped by definition. - - For structured types, stepped may be a list, overwriting the default for the - first or all curves. If not, applieas to all curves. - ''', - OrType(BoolType(), ArrayOf(BoolType())), export=True, default=False, settable=False) + OrType(historyStruct, ArrayOf(historyStruct)), export=True, default={}, settable=False) # used on the instance copy only value = None diff --git a/secop/server.py b/secop/server.py index 4fa85b3..c09ac3b 100644 --- a/secop/server.py +++ b/secop/server.py @@ -114,6 +114,7 @@ class Server: self.log.warning('ambiguous sections in %s: %r' % (cfgfiles, tuple(ambiguous_sections))) self._cfgfiles = cfgfiles self._pidfile = os.path.join(cfg['piddir'], name + '.pid') + self.close_callbacks = [] def loadCfgFile(self, cfgfile): if not cfgfile.endswith('.cfg'): @@ -210,8 +211,7 @@ class Server: self.interface.serve_forever() except KeyboardInterrupt as e: self._restart = False - self.dispatcher.close() - self.interface.server_close() + self.close() if self._restart: self.restart_hook() self.log.info('restart') @@ -227,6 +227,12 @@ class Server: self._restart = False self.interface.shutdown() + def close(self): + self.dispatcher.close() + self.interface.server_close() + for cb in self.close_callbacks: + cb() + def _processCfg(self): errors = [] opts = dict(self.node_cfg) @@ -338,8 +344,13 @@ class Server: self.log.info('all modules and pollers started') history_path = os.environ.get('FRAPPY_HISTORY') if history_path: - from secop_psi.historywriter import add_writer # pylint: disable=import-outside-toplevel - add_writer(history_path, self) + try: + from secop.historywriter import FrappyHistory # pylint: disable=import-outside-toplevel + history = FrappyHistory(history_path, self.modules, self.log.getChild('history')) + self.close_callbacks.append(history.close) + except ImportError: + raise + self.log.warning('FRAPPY_HISTORY is defined, but frappyhistory package not available') # TODO: if ever somebody wants to implement an other history writer: # - a general config file /etc/secp/secop.conf or /etc/secop.conf # might be introduced, which contains the log, pid and cfg directory path and