336 lines
16 KiB
Python
336 lines
16 KiB
Python
from influxdb_client import InfluxDBClient
|
|
from configparser import ConfigParser
|
|
import ast
|
|
from datetime import datetime
|
|
|
|
class InfluxDB:
|
|
"""
|
|
Class used to handle the connection with the InfluxDB instance
|
|
"""
|
|
def __init__(self):
|
|
config = ConfigParser()
|
|
config.read("./config/config.ini")
|
|
self._client = InfluxDBClient(url=config["INFLUX"]["url"], token=config["INFLUX"]["token"],
|
|
org=config["INFLUX"]["org"])
|
|
|
|
def disconnet(self):
|
|
"""
|
|
Disconnects from the InfluxDB instance
|
|
"""
|
|
self._client.close()
|
|
|
|
def query(self, query_str):
|
|
"""
|
|
Executes the query on the InfluxDB instance
|
|
|
|
Parameters :
|
|
query_str (string) : the Flux query to execute
|
|
|
|
Returns :
|
|
TableList : an InfluxDB list of the tables returned by the query
|
|
"""
|
|
return self._client.query_api().query(query_str)
|
|
|
|
class PrettyFloat(float):
|
|
"""saves bandwidth when converting to JSON
|
|
|
|
a lot of numbers originally have a fixed (low) number of decimal digits
|
|
as the binary representation is not exact, it might happen, that a
|
|
lot of superfluous digits are transmitted:
|
|
|
|
str(1/10*3) == '0.30000000000000004'
|
|
str(PrettyFloat(1/10*3)) == '0.3'
|
|
"""
|
|
def __repr__(self):
|
|
return '%.15g' % self
|
|
|
|
class InfluxDataGetter:
|
|
def __init__(self, db, influx_instrument_config):
|
|
self._influx_instrument_config = influx_instrument_config
|
|
self._bucket = self._influx_instrument_config["bucket"]
|
|
self._db = db
|
|
|
|
# ----- PUBLIC METHODS
|
|
|
|
def get_available_variables_at_time(self, time):
|
|
"""
|
|
Gets the available variables (those that we can have a value for since the device has been installed on the instrument) at the given point in time.
|
|
|
|
Parameters :
|
|
time (int) : the unix timestamp in seconds of the given point in time
|
|
|
|
Returns :
|
|
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]}] : a list of dictionnaries, each one representing
|
|
a block of curves with their name, their label and their color to display, grouped by their tag (which can be the unit augmented with an index) and their unit.
|
|
"""
|
|
|
|
setup_info = self._get_setup_info_as_dict(time)
|
|
available_variables = self._extract_variables(setup_info)
|
|
available_variables = self._remove_variables_without_value_float(available_variables, time)
|
|
available_variables = self._set_variables_with_target(available_variables, time)
|
|
res = self._group_variables_by_unit(available_variables)
|
|
|
|
return res
|
|
|
|
def get_curves_in_timerange(self, variables, time):
|
|
"""
|
|
Gets the curves for the given variables within a timerange.
|
|
|
|
Parameters :
|
|
variables ([(str)]) : an array of variable names (Influx) to get the curves for
|
|
time ([int]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
|
|
|
|
Returns :
|
|
{(str):[[(int), (float)]]} : a dictionnary of curves. The key is the name of the influx variable, and the value is an array of pairs (also arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y).
|
|
"""
|
|
res = {}
|
|
for variable in variables:
|
|
if variable.endswith(".target"):
|
|
variable_name_for_query = variable[:-len(".target")]
|
|
res[variable] = self._get_curve(variable_name_for_query, True, time)
|
|
else:
|
|
res[variable] = self._get_curve(variable, False, time)
|
|
return res
|
|
|
|
# ----- PRIVATE METHODS
|
|
|
|
def _get_setup_info_as_dict(self, time):
|
|
"""
|
|
Gets the value of the field setup_info in the measurement nicos/se_main as a Python dict.
|
|
|
|
Parameters
|
|
time (int) : the Unix timestamp in seconds we want to look the availability of variables for.
|
|
|
|
Returns :
|
|
{(str):((str), {...})} : the parsed "setup_info dict". The key is the secop_module prefixed with "se_", and the value is a tuple with its first value
|
|
being the type of Secop device for this module, and the value is too big to give its signature. Some tuple examples can be found under graphs/setup_info_examples.
|
|
|
|
"""
|
|
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: 0, stop: {time + 1})
|
|
|> filter(fn: (r) => r._measurement == "nicos/se_main")
|
|
|> filter(fn: (r) => r._field == "setup_info")
|
|
|> last()
|
|
|> yield(name: "res")
|
|
"""
|
|
|
|
tables = self._db.query(query)
|
|
res = ast.literal_eval(tables[0].records[0].get_value())
|
|
# TODO : what if 1. there is no record and 2. what if res is get_value() is empty (do we try to look for another way of getting the variables ?)
|
|
return res
|
|
|
|
def _extract_variables(self, setup_info_dict):
|
|
"""
|
|
Extracts relevant information out of the setup_info dict for each available variable.
|
|
|
|
Parameters :
|
|
setup_info_dict ({(str):((str), {...})}) : the parsed "setup_info dict". The key is the secop_module prefixed with "se_", and the value is a tuple with its first value
|
|
being the type of Secop device for this module, and the value is too big to give its signature. Some tuple examples can be found under graphs/setup_info_examples.
|
|
|
|
Returns :
|
|
[{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}] : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a potential target available.
|
|
|
|
"""
|
|
available_varirables = [
|
|
{
|
|
"name":self._transform_setup_info_variable_name_to_influx(setup_info_variable_name),
|
|
"label":content[1]["secop_module"],
|
|
"unit":content[1]["unit"],
|
|
"has_potential_target": "target_datainfo" in content[1].keys()
|
|
}
|
|
for (setup_info_variable_name, content) in setup_info_dict.items() if content[0] != "nicos.devices.secop.devices.SecopDevice"
|
|
]
|
|
return available_varirables
|
|
|
|
def _transform_setup_info_variable_name_to_influx(self, setup_info_name):
|
|
"""
|
|
Transforms the name of the variable available in the setup_info dict into the Influx name.
|
|
|
|
Parameters :
|
|
setup_info_name (str) : the name of the variable in the setup_info dict.
|
|
|
|
Returns :
|
|
str : the transformed variable name that matches the Influx names reqauirements
|
|
"""
|
|
return self._influx_instrument_config["measurement_prefix"] + setup_info_name.lower()[len(self._influx_instrument_config["setup_info_prefix"]):]
|
|
|
|
def _remove_variables_without_value_float(self, available_variables, time):
|
|
"""
|
|
Removes some of the previously identified available_variables if they effectively do not have a value_float field in InfluxDB.
|
|
|
|
Parameters :
|
|
available_variables ([{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a potential target available.
|
|
time (int): the unix timestamp in seconds at which we want to get the available variables (for the target).
|
|
|
|
Returns :
|
|
[{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}] : an array of dictionnaries (updated), each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a potential target available.
|
|
"""
|
|
res = []
|
|
for variable in available_variables:
|
|
query = f"""
|
|
import "influxdata/influxdb/schema"
|
|
schema.measurementFieldKeys(bucket: "{self._bucket}", measurement: "{variable["name"]}", start:0, stop: {time + 1})
|
|
|> yield(name: "res")
|
|
"""
|
|
records = self._db.query(query)[0].records
|
|
if "value_float" in [record.get_value() for record in records]:
|
|
res.append(variable)
|
|
|
|
return res
|
|
|
|
def _set_variables_with_target(self, available_variables, time):
|
|
"""
|
|
Determines if the previously identified available_variables have effectively a target or not (meaning it has a target_float field in Influx).
|
|
|
|
Parameters :
|
|
available_variables ([{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a potential target available.
|
|
time (int): the unix timestamp in seconds at which we want to get the available variables (for the target).
|
|
|
|
Returns :
|
|
[{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}] : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a potential target available (updated).
|
|
"""
|
|
|
|
for variable in available_variables:
|
|
if variable["has_potential_target"]:
|
|
query = f"""
|
|
import "influxdata/influxdb/schema"
|
|
schema.measurementFieldKeys(bucket: "{self._bucket}", measurement: "{variable["name"]}", start:0, stop: {time + 1})
|
|
|> yield(name: "res")
|
|
"""
|
|
records = self._db.query(query)[0].records
|
|
if not "target_float" in [record.get_value() for record in records]:
|
|
variable["has_potential_target"] = False
|
|
|
|
return available_variables
|
|
|
|
def _group_variables_by_unit(self, available_variables):
|
|
"""Performs a group by unit, while removing useless information and adding target curves.
|
|
|
|
Parameters :
|
|
available_variables ([{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a target available.
|
|
|
|
Returns :
|
|
|
|
[{"tag":(str), "unit":(str), "curves":[{"name":(str), }]]
|
|
"""
|
|
groups = {}
|
|
|
|
for available_variable in available_variables:
|
|
if available_variable["has_potential_target"]:
|
|
target_variable = self._get_formatted_target_variable(available_variable)
|
|
self._append_variable(groups, target_variable)
|
|
self._append_variable(groups, available_variable)
|
|
|
|
return list(groups.values())
|
|
|
|
def _get_formatted_target_variable(self, variable):
|
|
"""
|
|
Formats the variable which has a target to be added to the unit groups, meaning it adds ".target" as the suffix in the Influx name and label, and removes the "has_target" value.
|
|
|
|
Parameters :
|
|
variable ({"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}) : a dictionnary containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a target available.
|
|
|
|
Returns :
|
|
{"name":(str), "label":(str), "unit":(str)} : a dictionnary containing the Influx name of the corresponding variable out of the setup_info dict (augmented with ".target" suffix),
|
|
the label (augmented with ".target" suffix) to display in the Web GUI.
|
|
"""
|
|
|
|
return {
|
|
"name":variable["name"]+".target",
|
|
"label":variable["name"][len(self._influx_instrument_config["measurement_prefix"]):]+".target",
|
|
"unit":variable["unit"]
|
|
}
|
|
|
|
def _append_variable(self, groups, variable):
|
|
"""
|
|
Appends the variable in the unit group with a tag and a color, and creates the unit key if not available.
|
|
|
|
Parameters :
|
|
groups ({}) : a dictionnary that contains the curves grouped by unit, which will be updated
|
|
variable ({"name":(str), "label":(str), "unit":(str)[,"has_potential_target":(bool)]}) : a dictionnary containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, its unit and possibly a boolean value indicating if the variable has a target available.
|
|
"""
|
|
|
|
if variable["unit"] not in groups.keys():
|
|
groups[variable["unit"]] = {"tag":variable["unit"], "unit":variable["unit"], "curves":[]}
|
|
groups[variable["unit"]]["curves"].append({
|
|
"name":variable["name"],
|
|
"label":variable["label"],
|
|
"color":""
|
|
})
|
|
|
|
def _get_curve(self, variable, is_target, time):
|
|
"""
|
|
Gets the points (curve) within a timerange for the given variable.
|
|
|
|
Parameters :
|
|
variable (str) : the name (Influx) of the variable we want the values of.
|
|
is_target (bool) : tells if the given variable is a target, or not (if variable is "nicos/se_t_chip.target", then is_target has to be set to True)
|
|
time ([(int)]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
|
|
|
|
Returns :
|
|
[[(int), (float)]] : an array of pairs (also arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y)
|
|
"""
|
|
res = []
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: {time[0]}, stop: {time[1] + 1})
|
|
|> filter(fn : (r) => r._measurement == "{variable}")
|
|
|> filter(fn : (r) => r._field == "{"target_float" if is_target else "value_float"}")
|
|
|> keep(columns: ["_time","_value"])
|
|
|> yield(name: "res")
|
|
"""
|
|
tables = self._db.query(query)
|
|
for table in tables:
|
|
for record in table.records:
|
|
t = int(datetime.timestamp(record.get_time()))
|
|
value = record.get_value()
|
|
try:
|
|
value = PrettyFloat(value)
|
|
except:
|
|
value = None
|
|
res.append([t, value])
|
|
|
|
return self._insert_last_know_value(variable, is_target, res, time)
|
|
|
|
def _insert_last_know_value(self, variable, is_target, curve, time):
|
|
"""
|
|
Adds the last known value as the first point in the curve if the last known value is outside the viewing window.
|
|
|
|
Parameters :
|
|
variable (str) : the name (Influx) of the variable we want the values of.
|
|
is_target (bool) : tells if the given variable is a target, or not (if variable is "nicos/se_t_chip.target", then is_target has to be set to True)
|
|
curve ([[(int), (float)]]) : an array of pairs (arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y)
|
|
time ([(int)]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
|
|
|
|
Returns :
|
|
[[(int), (float)]] : the curve parameter, updated with a potential new first point
|
|
"""
|
|
|
|
if len(curve) == 0 or curve[0][0] != time[0]:
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: 0, stop: {time[0]+1})
|
|
|> filter(fn : (r) => r._measurement == "{variable}")
|
|
|> filter(fn : (r) => r._field == "{"target_float" if is_target else "value_float"}")
|
|
|> last()
|
|
|> keep(columns: ["_value"])
|
|
|> yield(name: "res")
|
|
"""
|
|
record = self._db.query(query)[0].records[0]
|
|
value = record.get_value()
|
|
try:
|
|
value = PrettyFloat(value)
|
|
except:
|
|
value = None
|
|
curve.insert(0, [time[0], value])
|
|
return curve |