history, as of 2022-02-01

Change-Id: I725a57546df8f7c9e5ffe04eb98872f2a609efe4
This commit is contained in:
2022-02-02 09:52:01 +01:00
parent 8253fe471b
commit 05d0cfb193
3 changed files with 130 additions and 148 deletions

View File

@ -1,232 +0,0 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
import time
try:
import frappyhistory # pylint: disable=import-error
except ImportError:
pass # do not complain when used for tests
from secop.lib import clamp, formatExtendedTraceback
from secop.datatypes import IntRange, FloatRange, ScaledInteger,\
EnumType, BoolType, StringType, TupleOf, StructOf, ArrayOf, TextType
def make_cvt_list(dt, tail):
"""create conversion list
list of tuple (<conversion function>, <tail>, <curve options>)
tail is a postfix to be appended in case of tuples and structs
"""
if isinstance(dt, (IntRange, BoolType)):
return [(int, {'key': tail})]
if isinstance(dt, EnumType):
return [(int, {'key': tail, 'enum': dt.export_datatype()['members']})]
if isinstance(dt, (FloatRange, ScaledInteger)):
opts = {'key': tail}
if dt.unit:
opts['group'] = dt.unit
opts['stepped'] = True
return [(dt.import_value, opts)]
if isinstance(dt, StringType):
opts = {'key': tail, 'kind': 'STR'}
if isinstance(dt, TextType):
opts['category'] = 'no'
return [(lambda x: x, opts)]
if isinstance(dt, TupleOf):
result = []
for index, elmtype in enumerate(dt.members):
for fun, opts in make_cvt_list(elmtype, '%s.%s' % (tail, index)):
def conv(value, key=index, func=fun):
return func(value[key])
result.append((conv, opts))
return result
if isinstance(dt, ArrayOf):
result = []
for index in range(dt.maxlen):
for fun, opts in make_cvt_list(dt.members, '%s.%s' % (tail, index)):
opts['category'] = 'no'
def conv(value, key=index, func=fun):
return func(value[key])
result.append((conv, opts))
return result
if isinstance(dt, StructOf):
result = []
for subkey, elmtype in dt.members.items():
for fun, opts in make_cvt_list(elmtype, '%s.%s' % (tail, subkey)):
def conv(value, key=subkey, func=fun):
return func(value.get(key)) # None for missing struct key, should not be needed
result.append((conv, opts))
return result
return [] # other types (BlobType) are ignored: too much data, probably not used
class FrappyAbstractHistoryWriter:
"""abstract writer
doc only
"""
def put_def(self, key, kind='NUM', category='minor', **opts):
"""define or overwrite a new curve named <key> with options from dict <opts>
:param key: the key for the curve
:param kind: 'NUM' (default) for numeric values, 'STR' for strings
:param category: 'major' or 'minor': importance of curve
:param opts: a dict containing some of the following options
- label: a label for the curve in the chart
- group: grouping of the curves in charts (often equal to unit)
- stepped: lines in charts should be drawn as stepped line. Only applicable when kind='NUM'
True by default.
"""
raise NotImplementedError()
def put(self, timestamp, key, value):
"""add a data point
:param timestamp: the timestamp. must not decrease!
:param key: the curve name
:param value: the value to be stored (number or string), None indicates un undefined value
"""
raise NotImplementedError()
def get(self, key):
"""get from cache
:param key: the curve name
:returns: the last stored value or None
"""
raise NotImplementedError()
def close(self, timestamp):
"""close the writer
:param timestamp:
indicate to the writer that all values are getting undefined after <timestamp>
"""
raise NotImplementedError()
class FrappyHistoryHandler:
def __init__(self, writer, modules, dispatcher, logger=None):
self.writer = writer
self.log = logger
self.cvt_lists = {} # dict <mod:param> of <conversion list>
self.dispatcher = dispatcher
self._init_time = time.time()
self._last_time = self._init_time
for modname, modobj in modules.items():
for pname, pobj in modobj.parameters.items():
ident = '%s:%s' % (modname, pobj.export)
key = '%s:%s' % (modname, pname)
dt = pobj.datatype
cvt_list = make_cvt_list(dt, key)
# create default opts
if pobj.history_category:
values = pobj.history_category.split(',')
else:
values = [None]
if len(values) == 1:
values *= len(cvt_list)
for cat, (_, opts) in zip(values, cvt_list):
if cat is None:
cat = opts.get('category', 'major' if pname in ('value', 'target') else 'minor')
opts['category'] = cat
if pname == 'value':
for _, opts in cvt_list:
opts['key'] = opts['key'].replace(':value', '')
if pname == 'status':
# default labels '<modname>:status' and '<modname>:status_text'
for lbl, (_, opts) in zip([key, key + '_text'], cvt_list):
opts['label'] = lbl
# overwrite opts based on history_* properties
if pobj.history_label:
for lbl, (_, opts) in zip(','.split(pobj.history_label), cvt_list):
opts['label'] = lbl
if pobj.history_group:
values = pobj.history_group.split(',')
if len(values) == 1:
values *= len(cvt_list)
for grp, (_, opts) in zip(values, cvt_list):
opts['group'] = grp
if pobj.history_stepped:
values = pobj.history_stepped
elif pobj.readonly:
values = False
if not isinstance(values, tuple):
values = [values] * len(cvt_list)
for stp, (_, opts) in zip(values, cvt_list):
if not stp and 'stepped' in opts: # only on floats
opts['stepped'] = False
cvt_list = [(key, opts) for key, opts in cvt_list if opts.get('category') != 'no']
for _, opts in cvt_list:
if opts.get('stepped'):
opts.pop('stepped', None)
writer.put_def(**opts)
if cvt_list:
self.cvt_lists[ident] = cvt_list
self.dispatcher.handle_activate(self, None, None)
self._init_time = None
def close_message(self, msg):
self.writer.close(time.time())
def send_reply(self, msg):
try:
action, ident, value = msg
assert action.endswith('update')
cvt_list = self.cvt_lists.get(ident)
if not cvt_list:
return
if self._init_time:
t = self._init_time # on initialisation, use the same timestamp for all
else:
t = value[1].get('t')
if t:
# make sure time stamp is not decreasing, as a potentially decreasing
# value might bring the reader software into trouble
t = clamp(self._last_time, t, time.time())
else:
t = time.time()
self._last_time = t
if action == 'update':
for fun, opts in cvt_list:
# we only look at the value, qualifiers are ignored for now
# we do not use the timestamp here, as a potentially decreasing value might
# bring the reader software into trouble
# print('UPDATE', key, value, t)
self.writer.put(t, opts['key'], fun(value[0]))
else: # error_update
for _, opts in cvt_list:
self.writer.put(t, opts['key'], None)
except Exception as e:
self.log.error('FrappyHistoryHandler.send_reply: %r with msg %r: ', repr(e), msg)
print(formatExtendedTraceback())
def add_writer(history_path, srv):
# treat handler as a connection
logger = srv.log.getChild('history')
srv.dispatcher.add_connection(FrappyHistoryHandler(
frappyhistory.Writer(history_path, logger), srv.modules, srv.dispatcher, logger))