Files
seweb/influxgraph.py
Markus Zolliker b79616fd8d fix chart config parameters
- add SEA dil pressures
- read config each time when it is used
2025-05-13 10:50:51 +02:00

461 lines
21 KiB
Python

from time import time as current_time
import logging
import json
import io
import uuid
# from configparser import ConfigParser
from math import ceil
from sehistory.seinflux import fmtime
from colors import assign_colors_to_curves
from chart_config import ChartConfig
from base import get_abs_time, HandlerBase
def split_tags(tags):
return {k: v.split(',') for k, v in tags.items()}
class InfluxGraph(HandlerBase):
"""Class implementing the logic of the different routes that are called by the client to retrieve graph data with InfluxDB.
Global constants :
HISTORICAL (int) : value that represents the "historical" visualization mode, meaning that the
most recent point is not in the visualisation window (no live data is sent).
ACTUAL (int) : value that represents the "actual" visualization mode, wihch is an intermediate
state used before going for live mode (the requested time window includes now)
LIVE (int) : value that represents the "live" visualization mode, meaning that new points are
sent to the client.
Attributes :
influx_data_getter (InfluxDataGetter) : the InfluxDataGetter instance that allows to get data out of InfluxDB.
chart_configs ([ChartConfig]) : an array of chart configuration to apply when /getvars is called
livemode (int) : the type of visualization the user is currently in. Can be HISTORICAL, ACTUAL or LIVE.
end_query (int) : the unix timestamp in seconds of the most recent requested point in time of the last query
or update.
last_values ({(str):((int), (float))}) : a dictionnary where the keys are the variable names, and the values
are tuples, where the first value is the unix timestamp of the most recent value known for this variable,
and the second value its corresponding value
variables ({(str):(str)}) : a dictionary of the current available variables requested by the client.
The key is the InfluxDB name of the curve, and the value is its label in the GUI.
"""
HISTORICAL = 0
ACTUAL = 1
LIVE = 2
def __init__(self, server, instrument, device_name, tags):
"""create instance for retrieving history
:param db: a database client (SEInflux instance)
:param instrument: the name of anm instrument or None
:param streams: a stream or comma separated list of streams
:param devices: a device name ar a comma separated list of devices
:param device_name: (comma separated) device name for labelling
typically only one of the 3 last parameters are needed
if more are specified, all of them must be fulfilled
"""
super().__init__() # put methods w_... to handlers
self.handlers['graphpoll'] = self.graphpoll
self.server = server
self.db = server.db
# self.influx_data_getter = influx_data_getter
self.chart_configs = ["./config/generic.ini"]
self.instrument = instrument
self.device_name = device_name
if instrument: # TODO: should it not be better to have inifiles per device?
self.chart_configs.append(f"./config/{instrument}.ini")
self.livemode = self.HISTORICAL
self.last_values = {} # dict <variable> of last known point (<time>, <value>)
self.last_time = {} # dict <stream> of last received time
self.last_minute = 0
self.last_update = 0 # time of last call with a result
self.tags = None
self.init_tags = tags
def w_graph(self, variables, time="-1800,0", interval=None):
"""Get the curves given by variables in the time range "time"
spaced by "interval" if given (binning/resolution)
Called when the route /graph is reached.
Parameters :
variables (str) : a comma separated string of variable names (influx names) to retrieve
time (str) : a commma separated value string (range) of seconds.
values < one year are treated as relative from now.
interval (str) : the interval (resolution) of the values to get (string in seconds)
Returns :
{"type":"graph-draw", "graph":{(str):[[(int),(float)]]}} : a dictionary with its "graph-draw" type
(so it can be processed by the client), and a "graph" dictionary with the variable names as key,
and an array of points as a tuple (timestamp, y-value as float)
"""
start, end, now = get_abs_time([float(t) for t in time.split(',')] + [0])
start, end, now = int(start), ceil(end), ceil(now)
queried_variables = variables.split(',')
self.livemode = self.ACTUAL if end+10 >= now else self.HISTORICAL
logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode)
if interval:
interval = float(interval)
result = self.db.curves(start, end, queried_variables, merge='_measurement',
interval=interval or None, **self.tags)
self.update_last(result)
self.db.complete(result, self.last_time, 'stream')
self.last_minute = now // 60
return dict(type='graph-draw', graph={k: result[k] for k in queried_variables if k in result})
def update_last(self, curve_dict):
"""update last values per variable and last time per stream"""
for key, curve in curve_dict.items():
stream = curve.tags.get('stream')
tlast, value = curve[-1]
self.last_values[key] = curve[-1]
self.last_time[stream] = max(self.last_time.get(stream, 0), tlast)
def w_gettime(self, time):
"""Get the server time for the given time(range).
Called when the route /gettime is reached.
Parameters :
time (str="-1800,0") : the given point in time represented by a string,
which is a comma separated unix timestamp values list (in seconds).
values < one year are treated as relative from now.
Returns :
{"type":"time", "time":(int)} : a dictionary with its "time" type (so the data can be processed by the
client) and the server unix timestamp in seconds corresponding to the time asked by the client
"""
return dict(type='time', time=get_abs_time(
[float(t) for t in time.split(',')]))
def w_getvars(self, time, userconfiguration=None, **_):
"""Get the curve names available at a given point in time
with a possible user configuration on the client side.
Called when the route /getvars is reached.
Parameters :
time (str) : the given point in time represented by a string, which is a unix timestamp in seconds.
values < one year are treated as relative from now.
Might also be a comma separated time range.
userconfiguration (str|None) : the JSON string representing the user configuration
Returns :
{"type":"var_list", "device":(str), "blocks":[{"tag":(str),"unit":(str), "curves":
[{"name":(str), "label":(str), "color":(str), "original_color":(str)}]}]}:
a dictionnary with its "var_list" type (so the data can be processed by the client), the device that
was currently set at that time, and the available curves with the name of the internal variable,
the color to display for this curve, its original color in SEA, grouped by their tag (which is a
category or unit if absent) and their unit (in "blocks")
"""
time = get_abs_time([float(t) for t in time.split(',')])
start_time = int(time[0])
end_time = int(time[-1])
if userconfiguration is not None:
userconfiguration = json.loads(userconfiguration)
if self.instrument:
streams, tags, self.device_name = self.server.lookup_streams(self.instrument, **self.init_tags)
self.tags = {**self.init_tags, **tags}
else:
self.tags = self.init_tags
blocks = self.get_available_variables(start_time, end_time, self.chart_configs, userconfiguration)
# initialize self.last_values to keep track of the available variables
self.last_values = {var["name"]: [0, None] for block in blocks for var in block["curves"]}
assign_colors_to_curves(blocks)
# print('DEVICE', device_name, tags)
# for block in blocks:
# print(block['tag'], [c['name'] for c in block['curves']])
return {'type': 'var_list', 'blocks': blocks, 'device': self.device_name}
def get_available_variables(self, start_time, end_time, chart_configs=None, user_config=None):
"""Gets the available variables
(those that we can have a value for since the device has been installed
on the instrument) at the given point in time.
Here, a variable means : SECOP module name + parameter.
By default, this method returns the parameters "value" and "target",
unless the config files used in chart_configs or user_config indicates other directives.
Parameters :
start_time, send_time (int) : the unix timestamps in seconds of the point in time to get the variables at.
chart_configs ([ChartConfig] | None) :
an array of objects, each holding a configuration file for the chart.
Configurations are applied in the order of the list.
user_config ({(str):{"cat":(str), "color":(str), "unit":(str)}} | None) :
the Python dict representing the user configuration, applied at the end.
The key is <secop_module.parameter>.
Returns :
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]}] :
a list of dicts, each one representing
a block of curves with their name, their label and their color to display,
grouped by their category if given or unit (in tag).
"""
if start_time == end_time:
start_time = end_time - 3600
result = self.db.curves(start_time, end_time, _measurement=None,
merge='_measurement', **self.tags)
assert all(c.key_names[0] == '_measurement' for c in result.values())
variables = {k: t.tags.get('unit') for k, t in result.items()}
config = {}
if chart_configs:
for chart_config in chart_configs:
for key, cfg in ChartConfig(chart_config).variables.items():
config.setdefault(key, {}).update(cfg)
if user_config:
for key, cfg in user_config.items():
config.setdefault(key, {}).update(cfg)
groups = {}
def add_to_groups(name, cat=None, unit='1', color='', label=None):
if cat == '-':
return
if name.endswith('.value'):
if not cat:
cat = '*'
if not label:
label = name[:-6]
elif name.endswith('.target'):
if not cat:
cat = '*'
elif not cat:
return
unit = unit or '1'
tag = cat.replace('*', unit)
grp = groups.get(tag)
if grp is None:
crv_dict = {}
groups[tag] = {'tag': cat.replace('*', unit), 'unit': unit, 'curves': crv_dict}
else:
crv_dict = grp['curves']
crv_dict[name] = {'name': name, 'unit': unit, 'label': label or name}
# treat variables in config first (in their order!)
for key, cfg in config.items():
cat = cfg.pop('cat', None)
cfgunit = cfg.pop('unit', '')
if '.' in key:
unit = variables.pop(key, object)
if unit is not object:
add_to_groups(key, cat, cfgunit or unit, **cfg)
else:
var = f'{key}.value'
unit = variables.pop(var, object)
if unit is not object:
label = cfg.pop('label', None) or key
add_to_groups(var, cat, cfgunit or unit, label=label, **cfg)
var = f'{key}.target'
unit = variables.pop(var, object)
if unit is not object:
cfg.pop('color', None)
add_to_groups(var, cat, cfgunit or unit, **cfg)
for var, unit in variables.items():
add_to_groups(var, unit=unit)
# make order a bit more common
result = []
for key in ['K', 'T', 'W', 'ln/min'] + list(groups):
if key in groups:
group = groups.pop(key)
curve_dict = group['curves']
curves = []
# get first '.value' parameters and add targets if available
ordered_keys = [f'{m}.value' for m in ('tt', 'T', 'ts', 'Ts')]
for name in ordered_keys + list(curve_dict):
if name.endswith('.value'):
try:
curves.append(curve_dict.pop(name))
curves.append(curve_dict.pop(f'{name[:-6]}.target'))
except KeyError:
pass # skip not existing or already removed items
# add remaining curves
curves.extend(curve_dict.values())
# print(key, curves)
group['curves'] = curves
result.append(group)
return result
def w_updategraph(self):
"""Set the current visualisation mode to LIVE if not in HISTORICAL mode.
Called when the route /updategraph is reached.
Returns :
{"type":"accept-graph", "live": bool} : a dict with its "accept-graph" type and a "live"
value telling if the server could change its visualization mode to live
"""
logging.info("UPD GRAPH %d", self.livemode)
if self.livemode == self.HISTORICAL:
return dict(type='accept-graph', live=False)
else:
self.livemode = self.LIVE
return dict(type='accept-graph', live=True)
def w_export(self, variables, time, nan, interval, timeoffset=None):
"""
Returns the bytes of a dataframe with the curves given by variables in the time range "time"
Called when the route /export is reached.
Parameters :
variables (str) : a comma separataed value string of variable names (influx names) to retrieve
time (str) : a commma separated value string (range) of seconds.
nan (str) : the representation for NaN values in the TSV
interval (str) : the interval (resolution) of the values to get (string in seconds)
Returns :
io.BytesIO : an BytesIO object containing the dataframe to retrieve
"""
start, end = get_abs_time([float(t) for t in time.split(',')])
start, end = int(start), ceil(end)
queried_variables = variables.split(',')
interval = float(interval) if interval else None
timeoffset = None if timeoffset == 'now' else (timeoffset or 0)
result = self.db.export(start, end, queried_variables, timeoffset=timeoffset, none=nan, interval=interval,
**self.tags)
return io.BytesIO(result.encode('utf-8'))
def graphpoll(self):
"""
Polls the last known values for all the available variables, and returns only those whose polled values
are more recent than the most recent displayed one.
Every plain minute, all the variables are returned with a point having their last known value at the current
timestamp to synchronize all the curves on the GUI.
Returns :
{"type":"graph-update", "time":(int), "graph":{(str):[[(int),(float)]]}} | None :
a dictionary with its "graph-update" type
(so it can be processed by the client), and a "graph" dictionary with the variable names as key,
and an array of points, which are an array containing the timestamp
as their first value, and the y-value in float as their second one
"""
if self.livemode != self.LIVE:
return None
now = current_time()
if now < int(self.last_update) + 1.5:
# the server is only waiting after a None return
# this avoids to many queries with expected empty result
return None
last_time = int(min(self.last_time.values(), default=now-3600))
# if len(self.last_time) > 1:
# print('time_poll_jitter', max(self.last_time.values()) - min(self.last_time.values()))
prev_minute, self.last_minute = self.last_minute, now // 60
fullminute = prev_minute != self.last_minute
add_prev = 3600 if fullminute else 0
result = self.db.curves(last_time, None, list(self.last_values),
merge='_measurement', add_prev=add_prev, **self.tags)
to_remove = {}
for key, curve in result.items():
tlast = self.last_values.get(key, [0])[0]
# remove points older than the last known point. this might happen for different reasons:
# - queries are rounded to seconds
# - clocks of different streams might not be synched
l = len(curve)
for i, row in enumerate(curve):
if row[0] > tlast:
del curve[:i]
break
else:
if not fullminute:
to_remove[key] = l
self.update_last(result)
if fullminute:
self.db.complete(result, self.last_time, 'stream')
for key, length in to_remove.items():
curve = result[key]
if len(curve) > l:
del curve[:l]
else:
# if fullminute:
# print('R', key)
result.pop(key)
# print('poll', sum(len(c) for c in result.values()), self.last_time)
if len(result) > 0:
self.last_update = now
return dict(type='graph-update', time=last_time, graph=result)
return None
# class InfluxInstrument(HandlerBase):
#
# def __init__(self, instr_name, inst_config=None):
# super().__init__()
# self.db = InfluxDB()
# # self.influx_data_getter = InfluxDataGetter(self.db, instr_name)
# self.title = instr_name
# self.device = self.influx_data_getter.get_device_name(int(current_time()))
#
# def new_client(self):
# return self.register(InfluxClient(self))
class InfluxParams:
"""Class with dummy routes, in case client side is started with the right part init commands"""
def __init__(self):
self.id = uuid.uuid4().hex[0:15]
self.queue = []
def info(self):
return ["na"]
def w_getblock(self, path):
return dict(type='draw', title="graph", path=path, components=[])
def w_updateblock(self, path):
return dict(type='accept-block')
def w_console(self):
return dict(type='accept-console')
def w_sendcommand(self, command):
return dict(type='accept-command')
# class InfluxClient(InfluxParams, InfluxGraph):
# def __init__(self, instrument):
# InfluxParams.__init__(self)
# InfluxGraph.__init__(self, instrument)
#
# def poll(self):
# messages = self.queue
# self.queue = []
# msg = self.graphpoll()
# if msg:
# messages.append(msg)
# return messages
#
#
# class SecopInfluxClient(SecopClient, InfluxGraph):
# def __init__(self, instrument):
# SecopClient.__init__(self, instrument)
# InfluxGraph.__init__(self, instrument)
#
# def poll(self):
# messages = super().poll()
# msg = self.graphpoll()
# if msg:
# messages.append(msg)
# return messages
#
#
# class SecopInfluxInstrument(SecopInstrument):
#
# def __init__(self, inst_name, instrument_config):
# super().__init__(inst_name, instrument_config)
# config = ConfigParser()
# config.optionxform = str
# config.read("./config/influx.ini")
# section = config["INFLUX"]
# self.db = SEHistory()
# # self.db = InfluxDBWrapper(uri=section["url"], token=section["token"],
# # org=section["org"], bucket=section['bucket'])
# # self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
# # self.device = self.influx_data_getter.get_device_name(int(current_time()))
#
# def get_streams(self, timestamp=None):
# return self.db.get_streams(None, timestamp)
#
# def get_experiments(self, start=None, stop=None):
# return self.db.get_experiments(start, stop)