461 lines
21 KiB
Python
461 lines
21 KiB
Python
from time import time as current_time
|
|
import logging
|
|
import json
|
|
import io
|
|
import uuid
|
|
# from configparser import ConfigParser
|
|
from math import ceil
|
|
from sehistory.seinflux import fmtime
|
|
from colors import assign_colors_to_curves
|
|
from chart_config import ChartConfig
|
|
from base import get_abs_time, HandlerBase
|
|
|
|
|
|
def split_tags(tags):
|
|
return {k: v.split(',') for k, v in tags.items()}
|
|
|
|
|
|
class InfluxGraph(HandlerBase):
|
|
"""Class implementing the logic of the different routes that are called by the client to retrieve graph data with InfluxDB.
|
|
|
|
Global constants :
|
|
HISTORICAL (int) : value that represents the "historical" visualization mode, meaning that the
|
|
most recent point is not in the visualisation window (no live data is sent).
|
|
ACTUAL (int) : value that represents the "actual" visualization mode, wihch is an intermediate
|
|
state used before going for live mode (the requested time window includes now)
|
|
LIVE (int) : value that represents the "live" visualization mode, meaning that new points are
|
|
sent to the client.
|
|
|
|
Attributes :
|
|
influx_data_getter (InfluxDataGetter) : the InfluxDataGetter instance that allows to get data out of InfluxDB.
|
|
chart_configs ([ChartConfig]) : an array of chart configuration to apply when /getvars is called
|
|
livemode (int) : the type of visualization the user is currently in. Can be HISTORICAL, ACTUAL or LIVE.
|
|
end_query (int) : the unix timestamp in seconds of the most recent requested point in time of the last query
|
|
or update.
|
|
last_values ({(str):((int), (float))}) : a dictionnary where the keys are the variable names, and the values
|
|
are tuples, where the first value is the unix timestamp of the most recent value known for this variable,
|
|
and the second value its corresponding value
|
|
variables ({(str):(str)}) : a dictionary of the current available variables requested by the client.
|
|
The key is the InfluxDB name of the curve, and the value is its label in the GUI.
|
|
"""
|
|
HISTORICAL = 0
|
|
ACTUAL = 1
|
|
LIVE = 2
|
|
|
|
def __init__(self, server, instrument, device_name, tags):
|
|
"""create instance for retrieving history
|
|
|
|
:param db: a database client (SEInflux instance)
|
|
:param instrument: the name of anm instrument or None
|
|
:param streams: a stream or comma separated list of streams
|
|
:param devices: a device name ar a comma separated list of devices
|
|
:param device_name: (comma separated) device name for labelling
|
|
typically only one of the 3 last parameters are needed
|
|
if more are specified, all of them must be fulfilled
|
|
"""
|
|
super().__init__() # put methods w_... to handlers
|
|
self.handlers['graphpoll'] = self.graphpoll
|
|
self.server = server
|
|
self.db = server.db
|
|
# self.influx_data_getter = influx_data_getter
|
|
self.chart_configs = ["./config/generic.ini"]
|
|
self.instrument = instrument
|
|
self.device_name = device_name
|
|
if instrument: # TODO: should it not be better to have inifiles per device?
|
|
self.chart_configs.append(f"./config/{instrument}.ini")
|
|
self.livemode = self.HISTORICAL
|
|
self.last_values = {} # dict <variable> of last known point (<time>, <value>)
|
|
self.last_time = {} # dict <stream> of last received time
|
|
self.last_minute = 0
|
|
self.last_update = 0 # time of last call with a result
|
|
self.tags = None
|
|
self.init_tags = tags
|
|
|
|
def w_graph(self, variables, time="-1800,0", interval=None):
|
|
"""Get the curves given by variables in the time range "time"
|
|
|
|
spaced by "interval" if given (binning/resolution)
|
|
Called when the route /graph is reached.
|
|
|
|
Parameters :
|
|
variables (str) : a comma separated string of variable names (influx names) to retrieve
|
|
time (str) : a commma separated value string (range) of seconds.
|
|
values < one year are treated as relative from now.
|
|
interval (str) : the interval (resolution) of the values to get (string in seconds)
|
|
|
|
Returns :
|
|
{"type":"graph-draw", "graph":{(str):[[(int),(float)]]}} : a dictionary with its "graph-draw" type
|
|
(so it can be processed by the client), and a "graph" dictionary with the variable names as key,
|
|
and an array of points as a tuple (timestamp, y-value as float)
|
|
"""
|
|
start, end, now = get_abs_time([float(t) for t in time.split(',')] + [0])
|
|
start, end, now = int(start), ceil(end), ceil(now)
|
|
queried_variables = variables.split(',')
|
|
self.livemode = self.ACTUAL if end+10 >= now else self.HISTORICAL
|
|
logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode)
|
|
if interval:
|
|
interval = float(interval)
|
|
result = self.db.curves(start, end, queried_variables, merge='_measurement',
|
|
interval=interval or None, **self.tags)
|
|
self.update_last(result)
|
|
self.db.complete(result, self.last_time, 'stream')
|
|
self.last_minute = now // 60
|
|
return dict(type='graph-draw', graph={k: result[k] for k in queried_variables if k in result})
|
|
|
|
def update_last(self, curve_dict):
|
|
"""update last values per variable and last time per stream"""
|
|
for key, curve in curve_dict.items():
|
|
stream = curve.tags.get('stream')
|
|
tlast, value = curve[-1]
|
|
self.last_values[key] = curve[-1]
|
|
self.last_time[stream] = max(self.last_time.get(stream, 0), tlast)
|
|
|
|
def w_gettime(self, time):
|
|
"""Get the server time for the given time(range).
|
|
|
|
Called when the route /gettime is reached.
|
|
|
|
Parameters :
|
|
time (str="-1800,0") : the given point in time represented by a string,
|
|
which is a comma separated unix timestamp values list (in seconds).
|
|
values < one year are treated as relative from now.
|
|
|
|
Returns :
|
|
{"type":"time", "time":(int)} : a dictionary with its "time" type (so the data can be processed by the
|
|
client) and the server unix timestamp in seconds corresponding to the time asked by the client
|
|
"""
|
|
return dict(type='time', time=get_abs_time(
|
|
[float(t) for t in time.split(',')]))
|
|
|
|
def w_getvars(self, time, userconfiguration=None, **_):
|
|
"""Get the curve names available at a given point in time
|
|
|
|
with a possible user configuration on the client side.
|
|
Called when the route /getvars is reached.
|
|
|
|
Parameters :
|
|
time (str) : the given point in time represented by a string, which is a unix timestamp in seconds.
|
|
values < one year are treated as relative from now.
|
|
Might also be a comma separated time range.
|
|
userconfiguration (str|None) : the JSON string representing the user configuration
|
|
|
|
Returns :
|
|
{"type":"var_list", "device":(str), "blocks":[{"tag":(str),"unit":(str), "curves":
|
|
[{"name":(str), "label":(str), "color":(str), "original_color":(str)}]}]}:
|
|
a dictionnary with its "var_list" type (so the data can be processed by the client), the device that
|
|
was currently set at that time, and the available curves with the name of the internal variable,
|
|
the color to display for this curve, its original color in SEA, grouped by their tag (which is a
|
|
category or unit if absent) and their unit (in "blocks")
|
|
"""
|
|
|
|
time = get_abs_time([float(t) for t in time.split(',')])
|
|
start_time = int(time[0])
|
|
end_time = int(time[-1])
|
|
if userconfiguration is not None:
|
|
userconfiguration = json.loads(userconfiguration)
|
|
|
|
if self.instrument:
|
|
streams, tags, self.device_name = self.server.lookup_streams(self.instrument, **self.init_tags)
|
|
self.tags = {**self.init_tags, **tags}
|
|
else:
|
|
self.tags = self.init_tags
|
|
blocks = self.get_available_variables(start_time, end_time, self.chart_configs, userconfiguration)
|
|
# initialize self.last_values to keep track of the available variables
|
|
self.last_values = {var["name"]: [0, None] for block in blocks for var in block["curves"]}
|
|
assign_colors_to_curves(blocks)
|
|
# print('DEVICE', device_name, tags)
|
|
# for block in blocks:
|
|
# print(block['tag'], [c['name'] for c in block['curves']])
|
|
return {'type': 'var_list', 'blocks': blocks, 'device': self.device_name}
|
|
|
|
def get_available_variables(self, start_time, end_time, chart_configs=None, user_config=None):
|
|
"""Gets the available variables
|
|
|
|
(those that we can have a value for since the device has been installed
|
|
on the instrument) at the given point in time.
|
|
Here, a variable means : SECOP module name + parameter.
|
|
By default, this method returns the parameters "value" and "target",
|
|
unless the config files used in chart_configs or user_config indicates other directives.
|
|
|
|
Parameters :
|
|
start_time, send_time (int) : the unix timestamps in seconds of the point in time to get the variables at.
|
|
chart_configs ([ChartConfig] | None) :
|
|
an array of objects, each holding a configuration file for the chart.
|
|
Configurations are applied in the order of the list.
|
|
user_config ({(str):{"cat":(str), "color":(str), "unit":(str)}} | None) :
|
|
the Python dict representing the user configuration, applied at the end.
|
|
The key is <secop_module.parameter>.
|
|
|
|
Returns :
|
|
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]}] :
|
|
a list of dicts, each one representing
|
|
a block of curves with their name, their label and their color to display,
|
|
grouped by their category if given or unit (in tag).
|
|
"""
|
|
if start_time == end_time:
|
|
start_time = end_time - 3600
|
|
result = self.db.curves(start_time, end_time, _measurement=None,
|
|
merge='_measurement', **self.tags)
|
|
assert all(c.key_names[0] == '_measurement' for c in result.values())
|
|
variables = {k: t.tags.get('unit') for k, t in result.items()}
|
|
config = {}
|
|
if chart_configs:
|
|
for chart_config in chart_configs:
|
|
for key, cfg in ChartConfig(chart_config).variables.items():
|
|
config.setdefault(key, {}).update(cfg)
|
|
if user_config:
|
|
for key, cfg in user_config.items():
|
|
config.setdefault(key, {}).update(cfg)
|
|
groups = {}
|
|
|
|
def add_to_groups(name, cat=None, unit='1', color='', label=None):
|
|
if cat == '-':
|
|
return
|
|
if name.endswith('.value'):
|
|
if not cat:
|
|
cat = '*'
|
|
if not label:
|
|
label = name[:-6]
|
|
elif name.endswith('.target'):
|
|
if not cat:
|
|
cat = '*'
|
|
elif not cat:
|
|
return
|
|
unit = unit or '1'
|
|
tag = cat.replace('*', unit)
|
|
grp = groups.get(tag)
|
|
if grp is None:
|
|
crv_dict = {}
|
|
groups[tag] = {'tag': cat.replace('*', unit), 'unit': unit, 'curves': crv_dict}
|
|
else:
|
|
crv_dict = grp['curves']
|
|
crv_dict[name] = {'name': name, 'unit': unit, 'label': label or name}
|
|
|
|
# treat variables in config first (in their order!)
|
|
for key, cfg in config.items():
|
|
cat = cfg.pop('cat', None)
|
|
cfgunit = cfg.pop('unit', '')
|
|
if '.' in key:
|
|
unit = variables.pop(key, object)
|
|
if unit is not object:
|
|
add_to_groups(key, cat, cfgunit or unit, **cfg)
|
|
else:
|
|
var = f'{key}.value'
|
|
unit = variables.pop(var, object)
|
|
if unit is not object:
|
|
label = cfg.pop('label', None) or key
|
|
add_to_groups(var, cat, cfgunit or unit, label=label, **cfg)
|
|
var = f'{key}.target'
|
|
unit = variables.pop(var, object)
|
|
if unit is not object:
|
|
cfg.pop('color', None)
|
|
add_to_groups(var, cat, cfgunit or unit, **cfg)
|
|
for var, unit in variables.items():
|
|
add_to_groups(var, unit=unit)
|
|
# make order a bit more common
|
|
result = []
|
|
for key in ['K', 'T', 'W', 'ln/min'] + list(groups):
|
|
if key in groups:
|
|
group = groups.pop(key)
|
|
curve_dict = group['curves']
|
|
curves = []
|
|
# get first '.value' parameters and add targets if available
|
|
ordered_keys = [f'{m}.value' for m in ('tt', 'T', 'ts', 'Ts')]
|
|
for name in ordered_keys + list(curve_dict):
|
|
if name.endswith('.value'):
|
|
try:
|
|
curves.append(curve_dict.pop(name))
|
|
curves.append(curve_dict.pop(f'{name[:-6]}.target'))
|
|
except KeyError:
|
|
pass # skip not existing or already removed items
|
|
# add remaining curves
|
|
curves.extend(curve_dict.values())
|
|
# print(key, curves)
|
|
group['curves'] = curves
|
|
result.append(group)
|
|
return result
|
|
|
|
def w_updategraph(self):
|
|
"""Set the current visualisation mode to LIVE if not in HISTORICAL mode.
|
|
|
|
Called when the route /updategraph is reached.
|
|
Returns :
|
|
{"type":"accept-graph", "live": bool} : a dict with its "accept-graph" type and a "live"
|
|
value telling if the server could change its visualization mode to live
|
|
"""
|
|
logging.info("UPD GRAPH %d", self.livemode)
|
|
if self.livemode == self.HISTORICAL:
|
|
return dict(type='accept-graph', live=False)
|
|
else:
|
|
self.livemode = self.LIVE
|
|
return dict(type='accept-graph', live=True)
|
|
|
|
def w_export(self, variables, time, nan, interval, timeoffset=None):
|
|
"""
|
|
Returns the bytes of a dataframe with the curves given by variables in the time range "time"
|
|
Called when the route /export is reached.
|
|
|
|
Parameters :
|
|
variables (str) : a comma separataed value string of variable names (influx names) to retrieve
|
|
time (str) : a commma separated value string (range) of seconds.
|
|
nan (str) : the representation for NaN values in the TSV
|
|
interval (str) : the interval (resolution) of the values to get (string in seconds)
|
|
|
|
Returns :
|
|
io.BytesIO : an BytesIO object containing the dataframe to retrieve
|
|
"""
|
|
|
|
start, end = get_abs_time([float(t) for t in time.split(',')])
|
|
start, end = int(start), ceil(end)
|
|
|
|
queried_variables = variables.split(',')
|
|
interval = float(interval) if interval else None
|
|
timeoffset = None if timeoffset == 'now' else (timeoffset or 0)
|
|
result = self.db.export(start, end, queried_variables, timeoffset=timeoffset, none=nan, interval=interval,
|
|
**self.tags)
|
|
return io.BytesIO(result.encode('utf-8'))
|
|
|
|
def graphpoll(self):
|
|
"""
|
|
Polls the last known values for all the available variables, and returns only those whose polled values
|
|
are more recent than the most recent displayed one.
|
|
Every plain minute, all the variables are returned with a point having their last known value at the current
|
|
timestamp to synchronize all the curves on the GUI.
|
|
|
|
Returns :
|
|
{"type":"graph-update", "time":(int), "graph":{(str):[[(int),(float)]]}} | None :
|
|
a dictionary with its "graph-update" type
|
|
(so it can be processed by the client), and a "graph" dictionary with the variable names as key,
|
|
and an array of points, which are an array containing the timestamp
|
|
as their first value, and the y-value in float as their second one
|
|
"""
|
|
if self.livemode != self.LIVE:
|
|
return None
|
|
now = current_time()
|
|
if now < int(self.last_update) + 1.5:
|
|
# the server is only waiting after a None return
|
|
# this avoids to many queries with expected empty result
|
|
return None
|
|
last_time = int(min(self.last_time.values(), default=now-3600))
|
|
# if len(self.last_time) > 1:
|
|
# print('time_poll_jitter', max(self.last_time.values()) - min(self.last_time.values()))
|
|
prev_minute, self.last_minute = self.last_minute, now // 60
|
|
fullminute = prev_minute != self.last_minute
|
|
add_prev = 3600 if fullminute else 0
|
|
result = self.db.curves(last_time, None, list(self.last_values),
|
|
merge='_measurement', add_prev=add_prev, **self.tags)
|
|
to_remove = {}
|
|
for key, curve in result.items():
|
|
tlast = self.last_values.get(key, [0])[0]
|
|
# remove points older than the last known point. this might happen for different reasons:
|
|
# - queries are rounded to seconds
|
|
# - clocks of different streams might not be synched
|
|
l = len(curve)
|
|
for i, row in enumerate(curve):
|
|
if row[0] > tlast:
|
|
del curve[:i]
|
|
break
|
|
else:
|
|
if not fullminute:
|
|
to_remove[key] = l
|
|
self.update_last(result)
|
|
if fullminute:
|
|
self.db.complete(result, self.last_time, 'stream')
|
|
for key, length in to_remove.items():
|
|
curve = result[key]
|
|
if len(curve) > l:
|
|
del curve[:l]
|
|
else:
|
|
# if fullminute:
|
|
# print('R', key)
|
|
result.pop(key)
|
|
# print('poll', sum(len(c) for c in result.values()), self.last_time)
|
|
if len(result) > 0:
|
|
self.last_update = now
|
|
return dict(type='graph-update', time=last_time, graph=result)
|
|
return None
|
|
|
|
|
|
# class InfluxInstrument(HandlerBase):
|
|
#
|
|
# def __init__(self, instr_name, inst_config=None):
|
|
# super().__init__()
|
|
# self.db = InfluxDB()
|
|
# # self.influx_data_getter = InfluxDataGetter(self.db, instr_name)
|
|
# self.title = instr_name
|
|
# self.device = self.influx_data_getter.get_device_name(int(current_time()))
|
|
#
|
|
# def new_client(self):
|
|
# return self.register(InfluxClient(self))
|
|
|
|
|
|
class InfluxParams:
|
|
"""Class with dummy routes, in case client side is started with the right part init commands"""
|
|
def __init__(self):
|
|
self.id = uuid.uuid4().hex[0:15]
|
|
self.queue = []
|
|
|
|
def info(self):
|
|
return ["na"]
|
|
|
|
def w_getblock(self, path):
|
|
return dict(type='draw', title="graph", path=path, components=[])
|
|
|
|
def w_updateblock(self, path):
|
|
return dict(type='accept-block')
|
|
|
|
def w_console(self):
|
|
return dict(type='accept-console')
|
|
|
|
def w_sendcommand(self, command):
|
|
return dict(type='accept-command')
|
|
|
|
|
|
# class InfluxClient(InfluxParams, InfluxGraph):
|
|
# def __init__(self, instrument):
|
|
# InfluxParams.__init__(self)
|
|
# InfluxGraph.__init__(self, instrument)
|
|
#
|
|
# def poll(self):
|
|
# messages = self.queue
|
|
# self.queue = []
|
|
# msg = self.graphpoll()
|
|
# if msg:
|
|
# messages.append(msg)
|
|
# return messages
|
|
#
|
|
#
|
|
# class SecopInfluxClient(SecopClient, InfluxGraph):
|
|
# def __init__(self, instrument):
|
|
# SecopClient.__init__(self, instrument)
|
|
# InfluxGraph.__init__(self, instrument)
|
|
#
|
|
# def poll(self):
|
|
# messages = super().poll()
|
|
# msg = self.graphpoll()
|
|
# if msg:
|
|
# messages.append(msg)
|
|
# return messages
|
|
#
|
|
#
|
|
# class SecopInfluxInstrument(SecopInstrument):
|
|
#
|
|
# def __init__(self, inst_name, instrument_config):
|
|
# super().__init__(inst_name, instrument_config)
|
|
# config = ConfigParser()
|
|
# config.optionxform = str
|
|
# config.read("./config/influx.ini")
|
|
# section = config["INFLUX"]
|
|
# self.db = SEHistory()
|
|
# # self.db = InfluxDBWrapper(uri=section["url"], token=section["token"],
|
|
# # org=section["org"], bucket=section['bucket'])
|
|
# # self.influx_data_getter = InfluxDataGetter(self.db, inst_name)
|
|
# # self.device = self.influx_data_getter.get_device_name(int(current_time()))
|
|
#
|
|
# def get_streams(self, timestamp=None):
|
|
# return self.db.get_streams(None, timestamp)
|
|
#
|
|
# def get_experiments(self, start=None, stop=None):
|
|
# return self.db.get_experiments(start, stop)
|
|
|