Files
seweb/influxgraph.py
2024-10-23 16:34:12 +02:00

306 lines
13 KiB
Python

import time
import logging
import json
import io
import uuid
from influxdb import InfluxDB, InfluxDataGetter
from colors import assign_colors_to_curves
from chart_config import ChartConfig
from base import Instrument, get_abs_time
from secop import SecopClient, SecopInstrument
class InfluxGraph:
"""
Class implementing the logic of the different routes that are called by
the client to retrieve graph data with InfluxDB.
Global constants :
HISTORICAL (int) : value that represents the "historical" visualization mode, meaning that the
most recent point is not in the visualisation window (no live data is sent).
ACTUAL (int) : value that represents the "actual" visualization mode, wihch is an intermediate
state used before going for live mode (the requested time window includes now)
LIVE (int) : value that represents the "live" visualization mode, meaning that new points are
sent to the client.
Attributes :
influx_data_getter (InfluxDataGetter) : the InfluxDataGetter instance that allows to get data out of InfluxDB.
chart_configs ([ChartConfig]) : an array of chart configuration to apply when /getvars is called
livemode (int) : the type of visualization the user is currently in. Can be HISTORICAL, ACTUAL or LIVE.
end_query (int) : the unix timestamp in seconds of the most recent requested point in time of the last query
or update.
lastvalues ({(str):((int), (float))}) : a dictionnary where the keys are the variable names, and the values
are tuples, where the first value is the unix timestamp of the most recent value known for this variable,
and the second value its corresponding value
variables ({(str):(str)}) : a dictionnary of the current available variables requested by the client.
The key is the InfluxDB name of the curve, and the value is its label in the GUI.
"""
HISTORICAL = 0
ACTUAL = 1
LIVE = 2
def __init__(self, influx_data_getter, instrument):
self.influx_data_getter = influx_data_getter
self.chart_configs = [ChartConfig("./config/generic.ini"), ChartConfig(f"./config/{instrument}.ini")]
self.livemode = self.HISTORICAL
self.end_query = 0
self.lastvalues = {}
self.variables = {} # name:label
def complete_to_end_and_feed_lastvalues(self, result, endtime):
"""
Completes the data until the last requested point in time by adding the last known y-value at the end point.
Also feeds self.lastvalues.
Parameters :
result ({(str):[[(int),(float)]]}) : a dictionnary with the variable names as key, and an array of points,
which are a tuple (timestamp, y-value as float)
endtime (int) : the unix timestamp in seconds of the time we want to have data until
"""
for var, c in result.items():
if c:
lastt, lastx = c[-1]
if lastt < endtime:
c.append((endtime, lastx))
self.lastvalues[var] = (lastt, lastx)
def w_graph(self, variables, time="-1800,0", interval=None):
"""
Gets the curves given by variables in the time range "time", spaced by "interval" if given (binning/resolution)
Called when the route /graph is reached.
Parameters :
variables (str) : a comma separataed value string of variable names (influx names) to retrieve
time (str) : a commma separated value string (range) of seconds. They are treated as relative from now
if they are lesser than one year.
interval (str) : the interval (resolution) of the values to get (string in milliseconds)
Returns :
{"type":"graph-draw", "graph":{(str):[[(int),(float)]]}} : a dictionnary with its "graph-draw" type
(so it can be processed by the client), and a "graph" dictionnary with the variable names as key,
and an array of points as a tuple (timestamp, y-value as float)
"""
time = [float(t) for t in time.split(',')]
start, end, now = get_abs_time(time + [0])
start, end, now = int(start), int(end), int(now)
queried_time_range = [start, end]
queried_variables = variables.split(',')
self.livemode = self.ACTUAL if end+10 >= now else self.HISTORICAL
logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode)
if interval : interval = int(interval)
result = self.influx_data_getter.get_curves_in_timerange(queried_variables, queried_time_range, interval)
self.complete_to_end_and_feed_lastvalues(result, min(end, now))
self.end_query = end
return dict(type='graph-draw', graph=result)
def w_gettime(self, time):
"""
Gets the server time for the give time.
Called when the route /gettime is reached.
Parameters :
time (str="-1800,0") : the given point in time represented by a string, which is a comma separated unix
timestamp values list (in seconds). They are treated as relative from now if they are lesser than one year.
Returns :
{"type":"time", "time":(int)} : a dictionnary with its "time" type (so the data can be processed by the
client) and the server unix timestamp in seconds corresponding to the time asked by the client
"""
time = [float(t) for t in time.split(',')]
return dict(type='time', time=get_abs_time(time))
def w_getvars(self, time, userconfiguration = None):
"""
Gets the curve names available at a given point in time, with a possible user configuration on the client side.
Called when the route /getvars is reached.
Parameters :
time (str) : the given point in time represented by a string, which is a unix timestamp in seconds.
It is treated as relative from now if it is lesser than one year.
userconfiguration (str|None) : the JSON string representing the user configuration
Returns :
{"type":"var_list", "device":(str), "blocks":[{"tag":(str),"unit":(str), "curves":
[{"name":(str), "label":(str), "color":(str), "original_color":(str)}]}]}:
a dictionnary with its "var_list" type (so the data can be processed by the client), the device that
was currently set at that time, and the available curves with the name of the internal variable,
the color to display for this curve, its original color in SEA, grouped by their tag (which is a
category or unit if absent) and their unit (in "blocks")
"""
time = [float(t) for t in time.split(',')]
end_time = int(get_abs_time(time)[-1])
if not userconfiguration == None : userconfiguration = json.loads(userconfiguration)
blocks = self.influx_data_getter.get_available_variables_at_time(end_time, self.chart_configs, userconfiguration)
device_name = self.influx_data_getter.get_device_name(end_time)
# updates the self.variables attribute to keep track of the available variables
self.variables = {variable["name"]:variable["label"] for block in blocks for variable in block["curves"]}
assign_colors_to_curves(blocks)
result = dict(type='var_list')
result['blocks'] = blocks
result['device'] = device_name
return result
def w_updategraph(self):
"""
Sets the current visualisation mode to LIVE if not in HISTORICAL mode.
Called when the route /updategraph is reached.
Returns :
{"type":"accept-graph", "live": bool} : a dict with its "accept-graph" type and a "live"
value telling if the server could change its visualization mode to live
"""
logging.info("UPD GRAPH %d", self.livemode)
if self.livemode == self.HISTORICAL:
return dict(type='accept-graph', live=False)
else:
self.livemode = self.LIVE
return dict(type='accept-graph', live=True)
def w_export(self, variables, time, nan, interval):
"""
Returns the bytes of a dataframe with the curves given by variables in the time range "time"
Called when the route /export is reached.
Parameters :
variables (str) : a comma separataed value string of variable names (influx names) to retrieve
time (str) : a commma separated value string (range) of seconds.
nan (str) : the representation for NaN values in the TSV
interval (str) : the interval (resolution) of the values to get (string in seconds)
Returns :
io.BytesIO : an BytesIO object containing the dataframe to retrieve
"""
time = [float(t) for t in time.split(',')]
start, end = get_abs_time(time)
start, end = int(start), int(end)
queried_variables = variables.split(',')
if interval != "None" : interval = int(interval)
df = self.influx_data_getter.get_curves_data_frame(queried_variables, [start, end], interval, self.variables)
mem = io.BytesIO()
df.to_csv(mem, sep="\t", index=False, float_format="%.15g", na_rep=nan)
mem.seek(0)
return mem
def graphpoll(self):
"""
Polls the last known values for all the available variables, and returns only those whose polled values
are more recent than the most recent displayed one.
Every plain minute, all the variables are returned with a point having their last known value at the current
timestamp to synchronize all the curves on the GUI.
Returns :
{"type":"graph-update", "time":(int), "graph":{(str):[[(int),(float)]]}} | None :
a dictionnary with its "graph-update" type
(so it can be processed by the client), and a "graph" dictionnary with the variable names as key,
and an array of points, which are an array containing the timestamp
as their first value, and the y-value in float as their second one
"""
if self.livemode != self.LIVE:
return None
now, = get_abs_time([0])
result = self.influx_data_getter.poll_last_values(list(self.variables.keys()), self.lastvalues, now)
for variable, values in list(result.items()):
tlast = self.lastvalues.get(variable, (0,))[0]
# removes points older than the last known point
# (queries are in seconds and might return points already displayed)
while values and values[0][0] <= tlast:
values.pop(0)
if values and values[-1][0] > tlast:
self.lastvalues[variable] = values[-1]
else:
del result[variable]
if int(now / 60) != int(self.end_query / 60):
# Update unchanged values every plain minute
for var, (_, lastx) in self.lastvalues.items():
if var not in result:
result[var] = [(now, lastx)]
self.end_query = now
if len(result) > 0:
return dict(type='graph-update', time=now, graph=result)
return None
class InfluxInstrument(Instrument):
def __init__(self, instr_name, inst_config=None):
super().__init__()
self.db = InfluxDB()
self.influx_data_getter = InfluxDataGetter(self.db, instr_name)
self.title = instr_name
self.device = self.influx_data_getter.get_device_name(int(time.time()))
def new_client(self):
return self.register(InfluxClient(self))
class InfluxParams:
"""Class with dummy routes, in case client side is started with the right part init commands"""
def __init__(self):
self.id = uuid.uuid4().hex[0:15]
self.queue = []
def info(self):
return ["na"]
def w_getblock(self, path):
return dict(type='draw', title="graph", path=path, components=[])
def w_updateblock(self, path):
return dict(type='accept-block')
def w_console(self):
return dict(type='accept-console')
def w_sendcommand(self, command):
return dict(type='accept-command')
class InfluxClient(InfluxParams, InfluxGraph):
def __init__(self, instrument):
InfluxParams.__init__(self)
InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title)
def poll(self):
messages = self.queue
self.queue = []
msg = self.graphpoll()
if msg:
messages.append(msg)
return messages
class SecopInfluxClient(SecopClient, InfluxGraph):
def __init__(self, instrument):
SecopClient.__init__(self, instrument)
InfluxGraph.__init__(self, instrument.influx_data_getter, instrument.title)
def poll(self):
messages = super().poll()
msg = self.graphpoll()
if msg:
messages.append(msg)
return messages
class SecopInfluxInstrument(SecopInstrument):
def __init__(self, inst_name, instrument_config):
super().__init__(inst_name, instrument_config)
self.db = InfluxDB()
self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
self.device = self.influx_data_getter.get_device_name(int(time.time()))
def new_client(self):
return self.register(SecopInfluxClient(self))