add history writer
Change-Id: I2d577dcf0c543c26680d157be959b0a608ace759
This commit is contained in:
108
secop/histwriter.py
Normal file
108
secop/histwriter.py
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# *****************************************************************************
|
||||||
|
# This program is free software; you can redistribute it and/or modify it under
|
||||||
|
# the terms of the GNU General Public License as published by the Free Software
|
||||||
|
# Foundation; either version 2 of the License, or (at your option) any later
|
||||||
|
# version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||||
|
# details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License along with
|
||||||
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||||
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||||
|
#
|
||||||
|
# Module authors:
|
||||||
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
||||||
|
# *****************************************************************************
|
||||||
|
|
||||||
|
import time
|
||||||
|
from secop.datatypes import get_datatype, IntRange, FloatRange, ScaledInteger,\
|
||||||
|
EnumType, BoolType, StringType, TupleOf, StructOf
|
||||||
|
import history.histwriter
|
||||||
|
|
||||||
|
|
||||||
|
class HistWriter(history.histwriter.HistWriter):
|
||||||
|
"""extend writer to be used as an internal frappy connection"""
|
||||||
|
def __init__(self, directory, predefined_names, dispatcher):
|
||||||
|
super().__init__(directory)
|
||||||
|
self.predefined_names = predefined_names
|
||||||
|
self.parameters = {} # dict <mod:param> of (<datatype>, <hist key>)
|
||||||
|
self.activated = False
|
||||||
|
self.dispatcher = dispatcher
|
||||||
|
self._init_time = None
|
||||||
|
print('HISTINIT')
|
||||||
|
|
||||||
|
def init(self, msg):
|
||||||
|
action, _, description = msg
|
||||||
|
assert action == 'describing'
|
||||||
|
vars = []
|
||||||
|
self._init_time = time.time()
|
||||||
|
|
||||||
|
for modname, moddesc in description['modules'].items():
|
||||||
|
for pname, pdesc in moddesc['accessibles'].items():
|
||||||
|
ident = key = modname + ':' + pname
|
||||||
|
if pname.startswith('_') and pname[1:] not in self.predefined_names:
|
||||||
|
key = modname + ':' + pname[1:]
|
||||||
|
dt = get_datatype(pdesc['datainfo'])
|
||||||
|
|
||||||
|
if pname == 'value':
|
||||||
|
continuous = isinstance(dt, (FloatRange, ScaledInteger))
|
||||||
|
vars.append('%s|%s|%s||%d' % (key, dt.unit or '1', modname, continuous))
|
||||||
|
elif pname == 'target':
|
||||||
|
vars.append('%s|%s|%s_target||0' % (key, dt.unit or '1', modname))
|
||||||
|
self.parameters[ident] = dt, key
|
||||||
|
self.put(self._init_time, 'STR', 'vars', ' '.join(vars))
|
||||||
|
self.dispatcher.handle_activate(self, None, None)
|
||||||
|
self._init_time = None
|
||||||
|
return
|
||||||
|
|
||||||
|
def send_reply(self, msg):
|
||||||
|
action, ident, value = msg
|
||||||
|
if not action.endswith('update'):
|
||||||
|
print('unknown async message %r' % msg)
|
||||||
|
return
|
||||||
|
now = self._init_time or time.time() # on initialisation, make all timestamps equal
|
||||||
|
dt, key = self.parameters[ident]
|
||||||
|
if action == 'update':
|
||||||
|
|
||||||
|
def convert(value, dt, key):
|
||||||
|
if isinstance(dt, (EnumType, IntRange, BoolType)):
|
||||||
|
return [('NUM', key, str(int(value)))]
|
||||||
|
if isinstance(dt, (FloatRange, ScaledInteger)):
|
||||||
|
return [('NUM', key, str(dt.import_value(value)))]
|
||||||
|
if isinstance(dt, StringType):
|
||||||
|
return [('STR', key, value)]
|
||||||
|
if isinstance(dt, TupleOf):
|
||||||
|
return sum((convert(value[i], d, '%s.%s' % (key, i)) for i, d in enumerate(dt.members)), [])
|
||||||
|
if isinstance(dt, StructOf):
|
||||||
|
return sum((convert(value[k], d, '%s.%s' % (key, k)) for d, k in dt.members.items()), [])
|
||||||
|
# ArrayType, BlobType and TextType are not considered: too much data, proabably not used
|
||||||
|
return []
|
||||||
|
|
||||||
|
# omit qualifiers. we do not use the timestamp here, as a potentially decreasing
|
||||||
|
# values might get the reader software into trouble
|
||||||
|
result = convert(value[0], dt, key)
|
||||||
|
for htype, key, strval in convert(value[0], dt, key):
|
||||||
|
self.put(now, htype, key, strval)
|
||||||
|
|
||||||
|
else: # error_update
|
||||||
|
old = self.cache.get(key)
|
||||||
|
if old is None:
|
||||||
|
return # ignore if this key is not yet used
|
||||||
|
|
||||||
|
def get_keys(dt, key):
|
||||||
|
if isinstance(dt, (IntRange, FloatRange, ScaledInteger, BoolType, EnumType)):
|
||||||
|
return [('NUM', key)]
|
||||||
|
if isinstance(dt, StringType):
|
||||||
|
return [('STR', key)]
|
||||||
|
if isinstance(dt, TupleOf):
|
||||||
|
return sum((get_keys(d, '%s.%s' % (key, i)) for i, d in enumerate(dt.members)), [])
|
||||||
|
if isinstance(dt, StructOf):
|
||||||
|
return sum((get_keys(d, '%s.%s' % (key, k)) for d, k in dt.members.items()), [])
|
||||||
|
return []
|
||||||
|
|
||||||
|
for htype, key in get_keys(dt, key):
|
||||||
|
self.put(now, htype, key, '')
|
@ -42,6 +42,7 @@ except ImportError:
|
|||||||
from secop.errors import ConfigError
|
from secop.errors import ConfigError
|
||||||
from secop.lib import formatException, get_class, getGeneralConfig
|
from secop.lib import formatException, get_class, getGeneralConfig
|
||||||
from secop.modules import Attached
|
from secop.modules import Attached
|
||||||
|
from secop.params import PREDEFINED_ACCESSIBLES
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import systemd.daemon
|
import systemd.daemon
|
||||||
@ -126,7 +127,7 @@ class Server:
|
|||||||
else:
|
else:
|
||||||
filename = None
|
filename = None
|
||||||
if filename is None:
|
if filename is None:
|
||||||
raise ConfigError("Couldn't find cfg file %r" % cfgfile)
|
raise ConfigError("Couldn't find cfg file %r in %s" % (cfgfile, cfg['confdir']))
|
||||||
self.log.debug('Parse config file %s ...' % filename)
|
self.log.debug('Parse config file %s ...' % filename)
|
||||||
result = OrderedDict()
|
result = OrderedDict()
|
||||||
parser = configparser.ConfigParser()
|
parser = configparser.ConfigParser()
|
||||||
@ -270,3 +271,10 @@ class Server:
|
|||||||
if not event.wait(timeout=max(0, deadline - time.time())):
|
if not event.wait(timeout=max(0, deadline - time.time())):
|
||||||
self.log.info('WARNING: timeout when starting %s' % name)
|
self.log.info('WARNING: timeout when starting %s' % name)
|
||||||
self.log.info('all modules and pollers started')
|
self.log.info('all modules and pollers started')
|
||||||
|
history_path = os.environ.get('FRAPPY_HISTORY')
|
||||||
|
if history_path:
|
||||||
|
from secop.histwriter import HistWriter
|
||||||
|
writer = HistWriter(history_path, PREDEFINED_ACCESSIBLES.keys(), self.dispatcher)
|
||||||
|
# treat writer as a connection
|
||||||
|
self.dispatcher.add_connection(writer)
|
||||||
|
writer.init(self.dispatcher.handle_describe(writer, None, None))
|
||||||
|
Reference in New Issue
Block a user