major rework

using influxdb structure independed of nicos cache
This commit is contained in:
2025-02-25 14:29:25 +01:00
parent 7aef895462
commit 2c59e37074
12 changed files with 381 additions and 272 deletions

View File

@ -1,19 +1,23 @@
import time
from time import time as current_time
import logging
import json
import io
import uuid
from influxdb import InfluxDB, InfluxDataGetter
from configparser import ConfigParser
from math import ceil
from sehistory.influx import InfluxDBWrapper
from colors import assign_colors_to_curves
from chart_config import ChartConfig
from base import Instrument, get_abs_time
from secop import SecopClient, SecopInstrument
def split_tags(tags):
return {k: v.split(',') for k, v in tags.items()}
class InfluxGraph:
"""
Class implementing the logic of the different routes that are called by
the client to retrieve graph data with InfluxDB.
"""Class implementing the logic of the different routes that are called by the client to retrieve graph data with InfluxDB.
Global constants :
HISTORICAL (int) : value that represents the "historical" visualization mode, meaning that the
@ -29,96 +33,99 @@ class InfluxGraph:
livemode (int) : the type of visualization the user is currently in. Can be HISTORICAL, ACTUAL or LIVE.
end_query (int) : the unix timestamp in seconds of the most recent requested point in time of the last query
or update.
lastvalues ({(str):((int), (float))}) : a dictionnary where the keys are the variable names, and the values
last_values ({(str):((int), (float))}) : a dictionnary where the keys are the variable names, and the values
are tuples, where the first value is the unix timestamp of the most recent value known for this variable,
and the second value its corresponding value
variables ({(str):(str)}) : a dictionnary of the current available variables requested by the client.
variables ({(str):(str)}) : a dictionary of the current available variables requested by the client.
The key is the InfluxDB name of the curve, and the value is its label in the GUI.
"""
HISTORICAL = 0
ACTUAL = 1
LIVE = 2
def __init__(self, influx_data_getter, instrument):
self.influx_data_getter = influx_data_getter
self.chart_configs = [ChartConfig("./config/generic.ini"), ChartConfig(f"./config/{instrument}.ini")]
def __init__(self, instrument):
self.instrument = instrument
self.db = instrument.db
instrument_name = instrument.title
# self.influx_data_getter = influx_data_getter
self.chart_configs = [ChartConfig("./config/generic.ini")]
try:
self.chart_configs.append(ChartConfig(f"./config/{instrument_name}.ini"))
except KeyError:
pass
self.livemode = self.HISTORICAL
self.end_query = 0
self.lastvalues = {}
self.variables = {} # name:label
def complete_to_end_and_feed_lastvalues(self, result, endtime):
"""
Completes the data until the last requested point in time by adding the last known y-value at the end point.
Also feeds self.lastvalues.
Parameters :
result ({(str):[[(int),(float)]]}) : a dictionnary with the variable names as key, and an array of points,
which are a tuple (timestamp, y-value as float)
endtime (int) : the unix timestamp in seconds of the time we want to have data until
"""
for var, c in result.items():
if c:
lastt, lastx = c[-1]
if lastt < endtime:
c.append((endtime, lastx))
self.lastvalues[var] = (lastt, lastx)
self.last_values = {} # dict <variable> of last known point (<time>, <value>)
self.last_time = {} # dict <stream> of last received time
self.last_minute = 0
self.last_update = 0 # time of last call with a result
self.tags = {} # tags for query (determines device and/or server)
def w_graph(self, variables, time="-1800,0", interval=None):
"""
Gets the curves given by variables in the time range "time", spaced by "interval" if given (binning/resolution)
"""Get the curves given by variables in the time range "time"
spaced by "interval" if given (binning/resolution)
Called when the route /graph is reached.
Parameters :
variables (str) : a comma separataed value string of variable names (influx names) to retrieve
time (str) : a commma separated value string (range) of seconds. They are treated as relative from now
if they are lesser than one year.
interval (str) : the interval (resolution) of the values to get (string in milliseconds)
variables (str) : a comma separated string of variable names (influx names) to retrieve
time (str) : a commma separated value string (range) of seconds.
values < one year are treated as relative from now.
interval (str) : the interval (resolution) of the values to get (string in seconds)
Returns :
{"type":"graph-draw", "graph":{(str):[[(int),(float)]]}} : a dictionnary with its "graph-draw" type
(so it can be processed by the client), and a "graph" dictionnary with the variable names as key,
{"type":"graph-draw", "graph":{(str):[[(int),(float)]]}} : a dictionary with its "graph-draw" type
(so it can be processed by the client), and a "graph" dictionary with the variable names as key,
and an array of points as a tuple (timestamp, y-value as float)
"""
time = [float(t) for t in time.split(',')]
start, end, now = get_abs_time(time + [0])
start, end, now = int(start), int(end), int(now)
queried_time_range = [start, end]
start, end, now = get_abs_time([float(t) for t in time.split(',')] + [0])
start, end, now = int(start), ceil(end), ceil(now)
queried_variables = variables.split(',')
self.livemode = self.ACTUAL if end+10 >= now else self.HISTORICAL
logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode)
if interval : interval = int(interval)
if interval:
interval = float(interval)
result = self.db.curves(start, end, queried_variables, merge='_measurement',
interval=interval or None, **self.tags)
self.update_last(result)
self.db.complete(result, self.last_time, 'stream')
self.last_minute = now // 60
return dict(type='graph-draw', graph={k: v for k, v in result.items()})
result = self.influx_data_getter.get_curves_in_timerange(queried_variables, queried_time_range, interval)
self.complete_to_end_and_feed_lastvalues(result, min(end, now))
self.end_query = end
return dict(type='graph-draw', graph=result)
def update_last(self, curve_dict):
"""update last values per variable and last time per stream"""
for key, curve in curve_dict.items():
stream = curve.tags.get('stream')
tlast, value = curve[-1]
self.last_values[key] = curve[-1]
self.last_time[stream] = max(self.last_time.get(stream, 0), tlast)
def w_gettime(self, time):
"""
Gets the server time for the give time.
"""Get the server time for the given time(range).
Called when the route /gettime is reached.
Parameters :
time (str="-1800,0") : the given point in time represented by a string, which is a comma separated unix
timestamp values list (in seconds). They are treated as relative from now if they are lesser than one year.
time (str="-1800,0") : the given point in time represented by a string,
which is a comma separated unix timestamp values list (in seconds).
values < one year are treated as relative from now.
Returns :
{"type":"time", "time":(int)} : a dictionnary with its "time" type (so the data can be processed by the
{"type":"time", "time":(int)} : a dictionary with its "time" type (so the data can be processed by the
client) and the server unix timestamp in seconds corresponding to the time asked by the client
"""
time = [float(t) for t in time.split(',')]
return dict(type='time', time=get_abs_time(time))
return dict(type='time', time=get_abs_time(
[float(t) for t in time.split(',')]))
def w_getvars(self, time, userconfiguration = None):
"""
Gets the curve names available at a given point in time, with a possible user configuration on the client side.
def w_getvars(self, time, userconfiguration=None, instrument=None, **tags):
"""Get the curve names available at a given point in time
with a possible user configuration on the client side.
Called when the route /getvars is reached.
Parameters :
time (str) : the given point in time represented by a string, which is a unix timestamp in seconds.
It is treated as relative from now if it is lesser than one year.
values < one year are treated as relative from now.
Might also be a comma separated time range.
userconfiguration (str|None) : the JSON string representing the user configuration
Returns :
@ -130,25 +137,118 @@ class InfluxGraph:
category or unit if absent) and their unit (in "blocks")
"""
time = [float(t) for t in time.split(',')]
end_time = int(get_abs_time(time)[-1])
if not userconfiguration == None : userconfiguration = json.loads(userconfiguration)
blocks = self.influx_data_getter.get_available_variables_at_time(end_time, self.chart_configs, userconfiguration)
device_name = self.influx_data_getter.get_device_name(end_time)
# updates the self.variables attribute to keep track of the available variables
self.variables = {variable["name"]:variable["label"] for block in blocks for variable in block["curves"]}
time = get_abs_time([float(t) for t in time.split(',')])
start_time = int(time[0])
end_time = int(time[-1])
if userconfiguration is not None:
userconfiguration = json.loads(userconfiguration)
self.tags = split_tags(tags)
if instrument:
tags['stream'] = list(self.db.get_streams(instrument))
print('GETAV', self.tags)
blocks = self.get_available_variables(start_time, end_time, self.chart_configs, userconfiguration)
device_name = tags.get('device', '<unknown>')
# initialize self.last_values to keep track of the available variables
self.last_values = {var["name"]: [0, None] for block in blocks for var in block["curves"]}
assign_colors_to_curves(blocks)
result = dict(type='var_list')
result['blocks'] = blocks
result['device'] = device_name
print('DEVICE', device_name, tags)
for block in blocks:
print(block['tag'], [c['name'] for c in block['curves']])
return result
def w_updategraph(self):
def get_available_variables(self, start_time, end_time, chart_configs=None, user_config=None):
"""Gets the available variables
(those that we can have a value for since the device has been installed
on the instrument) at the given point in time.
Here, a variable means : SECOP module name + parameter.
By default, this method returns the parameters "value" and "target",
unless the config files used in chart_configs or user_config indicates other directives.
Parameters :
start_time, send_time (int) : the unix timestamps in seconds of the point in time to get the variables at.
chart_configs ([ChartConfig] | None) :
an array of objects, each holding a configuration file for the chart.
Configurations are applied in the order of the list.
user_config ({(str):{"cat":(str), "color":(str), "unit":(str)}} | None) :
the Python dict representing the user configuration, applied at the end.
The key is <secop_module.parameter>.
Returns :
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]}] :
a list of dictionnaries, each one representing
a block of curves with their name, their label and their color to display,
grouped by their category if given or unit (in tag).
"""
Sets the current visualisation mode to LIVE if not in HISTORICAL mode.
if start_time == end_time:
start_time = end_time - 3600
result = self.db.curves(start_time, end_time, _measurement=None, _field='float', **self.tags)
assert all(c.key_names[0] == '_measurement' for c in result.values())
variables = {k[0] for k in result}
config = {}
if chart_configs:
for chart_config in chart_configs:
for key, cfg in chart_config.variables.items():
config.setdefault(key, {}).update(cfg)
if user_config:
for key, cfg in user_config.items():
config.setdefault(key, {}).update(cfg)
groups = {}
def add_to_groups(name, cat=None, unit='1', color='', label=None):
if cat == '-':
return
if name.endswith('.value'):
if not cat:
cat = '*'
if not label:
label = name[:-6]
elif name.endswith('.target'):
if not cat:
cat = '*'
elif not cat:
return
tag = cat.replace('*', unit)
grp = groups.get(tag)
if grp is None:
curves = []
groups[tag] = {'tag': cat.replace('*', unit), 'unit': unit, 'curves': curves}
else:
curves = grp['curves']
curves.append({'name': name, 'unit': unit, 'label': label or name})
# treat variables in config first (in their order!)
result = {}
for key, cfg in config.items():
cat = cfg.pop('cat', None)
unit = cfg.get('unit', '1')
if '.' in key:
if key in variables:
add_to_groups(key, cat, **cfg)
variables.discard(key)
else:
var = f'{key}.value'
if var in variables:
label = cfg.pop('label', None) or key
add_to_groups(var, cat, label=label, **cfg)
variables.discard(var)
var = f'{key}.target'
if var in variables:
cfg.pop('color', None)
add_to_groups(var, cat, **cfg)
variables.discard(var)
for var in variables:
add_to_groups(var)
return list(groups.values())
def w_updategraph(self):
"""Set the current visualisation mode to LIVE if not in HISTORICAL mode.
Called when the route /updategraph is reached.
Returns :
{"type":"accept-graph", "live": bool} : a dict with its "accept-graph" type and a "live"
@ -161,7 +261,7 @@ class InfluxGraph:
self.livemode = self.LIVE
return dict(type='accept-graph', live=True)
def w_export(self, variables, time, nan, interval):
def w_export(self, variables, time, nan, interval, timeoffset=None):
"""
Returns the bytes of a dataframe with the curves given by variables in the time range "time"
Called when the route /export is reached.
@ -176,19 +276,15 @@ class InfluxGraph:
io.BytesIO : an BytesIO object containing the dataframe to retrieve
"""
time = [float(t) for t in time.split(',')]
start, end = get_abs_time(time)
start, end = int(start), int(end)
start, end = get_abs_time([float(t) for t in time.split(',')])
start, end = int(start), ceil(end)
queried_variables = variables.split(',')
if interval != "None" : interval = int(interval)
df = self.influx_data_getter.get_curves_data_frame(queried_variables, [start, end], interval, self.variables)
mem = io.BytesIO()
df.to_csv(mem, sep="\t", index=False, float_format="%.15g", na_rep=nan)
mem.seek(0)
return mem
interval = float(interval) if interval else None
timeoffset = None if timeoffset == 'now' else (timeoffset or 0)
result = self.db.export(start, end, queried_variables, timeoffset=timeoffset, none=nan, interval=interval,
**self.tags)
return io.BytesIO(result.encode('utf-8'))
def graphpoll(self):
"""
@ -199,45 +295,65 @@ class InfluxGraph:
Returns :
{"type":"graph-update", "time":(int), "graph":{(str):[[(int),(float)]]}} | None :
a dictionnary with its "graph-update" type
(so it can be processed by the client), and a "graph" dictionnary with the variable names as key,
a dictionary with its "graph-update" type
(so it can be processed by the client), and a "graph" dictionary with the variable names as key,
and an array of points, which are an array containing the timestamp
as their first value, and the y-value in float as their second one
"""
if self.livemode != self.LIVE:
return None
now, = get_abs_time([0])
result = self.influx_data_getter.poll_last_values(list(self.variables.keys()), self.lastvalues, now)
for variable, values in list(result.items()):
tlast = self.lastvalues.get(variable, (0,))[0]
# removes points older than the last known point
# (queries are in seconds and might return points already displayed)
while values and values[0][0] <= tlast:
values.pop(0)
if values and values[-1][0] > tlast:
self.lastvalues[variable] = values[-1]
now = current_time()
if now < int(self.last_update) + 1.5:
# the server is only waiting after a None return
# this avoids to many queries with expected empty result
return None
last_time = int(min(self.last_time.values()))
# if len(self.last_time) > 1:
# print('time_poll_jitter', max(self.last_time.values()) - min(self.last_time.values()))
prev_minute, self.last_minute = self.last_minute, now // 60
fullminute = prev_minute != self.last_minute
add_prev = 3600 if fullminute else 0
result = self.db.curves(last_time, None, list(self.last_values),
merge='_measurement', add_prev=add_prev, **self.tags)
to_remove = {}
for key, curve in result.items():
tlast = self.last_values.get(key, [0])[0]
# remove points older than the last known point. this might happen for different reasons:
# - queries are rounded to seconds
# - clocks of different streams might not be synched
l = len(curve)
for i, row in enumerate(curve):
if row[0] > tlast:
del curve[:i]
break
else:
del result[variable]
if int(now / 60) != int(self.end_query / 60):
# Update unchanged values every plain minute
for var, (_, lastx) in self.lastvalues.items():
if var not in result:
result[var] = [(now, lastx)]
self.end_query = now
if not fullminute:
to_remove[key] = l
self.update_last(result)
if fullminute:
self.db.complete(result, self.last_time, 'stream')
for key, length in to_remove.items():
curve = result[key]
if len(curve) > l:
del curve[:l]
else:
if fullminute:
print('R', key)
result.pop(key)
# print('poll', sum(len(c) for c in result.values()), self.last_time)
if len(result) > 0:
return dict(type='graph-update', time=now, graph=result)
self.last_update = now
return dict(type='graph-update', time=last_time, graph=result)
return None
class InfluxInstrument(Instrument):
def __init__(self, instr_name, inst_config=None):
super().__init__()
self.db = InfluxDB()
self.influx_data_getter = InfluxDataGetter(self.db, instr_name)
# self.influx_data_getter = InfluxDataGetter(self.db, instr_name)
self.title = instr_name
self.device = self.influx_data_getter.get_device_name(int(time.time()))
self.device = self.influx_data_getter.get_device_name(int(current_time()))
def new_client(self):
return self.register(InfluxClient(self))
@ -268,7 +384,7 @@ class InfluxParams:
class InfluxClient(InfluxParams, InfluxGraph):
def __init__(self, instrument):
InfluxParams.__init__(self)
InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title)
InfluxGraph.__init__(self, instrument)
def poll(self):
messages = self.queue
@ -282,7 +398,7 @@ class InfluxClient(InfluxParams, InfluxGraph):
class SecopInfluxClient(SecopClient, InfluxGraph):
def __init__(self, instrument):
SecopClient.__init__(self, instrument)
InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title)
InfluxGraph.__init__(self, instrument)
def poll(self):
messages = super().poll()
@ -296,10 +412,20 @@ class SecopInfluxInstrument(SecopInstrument):
def __init__(self, inst_name, instrument_config):
super().__init__(inst_name, instrument_config)
self.db = InfluxDB()
self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
self.device = self.influx_data_getter.get_device_name(int(time.time()))
config = ConfigParser()
config.optionxform = str
config.read("./config/influx.ini")
section = config["INFLUX"]
self.db = InfluxDBWrapper('linse-c')
# self.db = InfluxDBWrapper(uri=section["url"], token=section["token"],
# org=section["org"], bucket=section['bucket'])
# self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
# self.device = self.influx_data_getter.get_device_name(int(current_time()))
def new_client(self):
return self.register(SecopInfluxClient(self))
def get_stream_tags(self, timestamp=None):
return self.db.get_streams(None, timestamp)