724 lines
39 KiB
Python
724 lines
39 KiB
Python
from influxdb_client import InfluxDBClient
|
|
from configparser import ConfigParser
|
|
import ast
|
|
from datetime import datetime
|
|
from pandas import DataFrame as df, merge_ordered
|
|
from numpy import NaN
|
|
|
|
MEASURMENT_PREFIX = "nicos/se_"
|
|
BUCKET_PREFIX = "nicos-cache-"
|
|
|
|
class InfluxDB:
|
|
"""
|
|
Class used to handle the connection with the InfluxDB instance
|
|
Attributes :
|
|
_client (InfluxDBClient) : the InfluxDB client
|
|
"""
|
|
|
|
def __init__(self):
|
|
config = ConfigParser()
|
|
config.read("./config/influx.ini")
|
|
self._client = InfluxDBClient(url=config["INFLUX"]["url"], token=config["INFLUX"]["token"],
|
|
org=config["INFLUX"]["org"])
|
|
|
|
def disconnet(self):
|
|
"""
|
|
Disconnects from the InfluxDB instance
|
|
"""
|
|
self._client.close()
|
|
|
|
def query(self, query_str):
|
|
"""
|
|
Executes the query on the InfluxDB instance
|
|
|
|
Parameters :
|
|
query_str (string) : the Flux query to execute
|
|
|
|
Returns :
|
|
TableList : an InfluxDB list of the tables returned by the query
|
|
"""
|
|
return self._client.query_api().query(query_str)
|
|
|
|
def query_data_frame(self, query_str):
|
|
"""
|
|
Executes the query on the InfluxDB instance and gets the response as a pandas DataFrame
|
|
|
|
Parameters :
|
|
query_str (string) : the Flux query to execute
|
|
|
|
Returns :
|
|
DataFrame : the query response as a DataFrame
|
|
"""
|
|
return self._client.query_api().query_data_frame(query_str)
|
|
|
|
class PrettyFloat(float):
|
|
"""saves bandwidth when converting to JSON
|
|
|
|
a lot of numbers originally have a fixed (low) number of decimal digits
|
|
as the binary representation is not exact, it might happen, that a
|
|
lot of superfluous digits are transmitted:
|
|
|
|
str(1/10*3) == '0.30000000000000004'
|
|
str(PrettyFloat(1/10*3)) == '0.3'
|
|
"""
|
|
def __repr__(self):
|
|
return '%.15g' % self
|
|
|
|
class InfluxDataGetter:
|
|
"""
|
|
Class used to get data from InfluxDB.
|
|
|
|
Attributes :
|
|
_bucket (str) : the name of the InfluxDB bucket to query (used for all queries)
|
|
_db (InfluxDB) : the InfluxDB instance of the database to query
|
|
"""
|
|
|
|
def __init__(self, db, instrument_name):
|
|
"""
|
|
Parameters :
|
|
db (InfluxDB) : the InfluxDB instance of the database to query
|
|
instrument_name (str) : the name of the instrument from which the data will be got
|
|
"""
|
|
self._bucket = BUCKET_PREFIX + instrument_name
|
|
self._db = db
|
|
|
|
# ----- PUBLIC METHODS
|
|
|
|
def get_available_variables_at_time(self, time, chart_configs = None, user_config = None):
|
|
"""
|
|
Gets the available variables (those that we can have a value for since the device has been installed on the instrument) at the given point in time.
|
|
Here, a variable means : SECOP module name + parameter. By default, this method returns the parameters "value" and "target", unless the config files used in chart_configs or user_config indicates other directives.
|
|
|
|
Parameters :
|
|
time (int) : the unix timestamps in seconds of the point in time to get the variables at.
|
|
chart_configs ([ChartConfig] | None) : an array of objects, each holding a configuration file for the chart. Configurations are applied in the order of the list.
|
|
user_config ({(str):{"cat":(str), "color":(str), "unit":(str)}} | None) : the Python dict representing the user configuration, applied at the end. The key is <secop_module.parameter>.
|
|
|
|
Returns :
|
|
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]}] : a list of dictionnaries, each one representing
|
|
a block of curves with their name, their label and their color to display, grouped by their category if given or unit (in tag).
|
|
"""
|
|
|
|
all_setup_info = self._get_all_setup_info_as_dict(time)
|
|
|
|
available_variables = self._extract_variables(all_setup_info)
|
|
if not chart_configs == None:
|
|
for chart_config in chart_configs:
|
|
available_variables = self._filter_params_with_config(available_variables, chart_config)
|
|
if not user_config == None:
|
|
available_variables = self._filter_params_with_user_config(available_variables, user_config)
|
|
available_variables = self._remove_variables_params_not_displayed(available_variables)
|
|
available_variables = self._remove_variables_params_wihout_param_float_and_split(available_variables, time)
|
|
res = self._group_variables_by_cat_unit(available_variables)
|
|
|
|
return res
|
|
|
|
def get_curves_in_timerange(self, variables, time, interval = None):
|
|
"""
|
|
Gets the curves for the given variables within a timerange.
|
|
|
|
Parameters :
|
|
variables ([(str)]) : an array of variable names (Influx) to get the curves for
|
|
time ([int]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
|
|
interval (int|None) : the interval (resolution) of the values to get (in nanoseconds)
|
|
|
|
Returns :
|
|
{(str):[[(int), (float)]]} : a dictionnary of curves. The key is the name of the influx variable, and the value is an array of pairs (also arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y).
|
|
"""
|
|
res = {}
|
|
for variable in variables:
|
|
var_param = variable.split(".")
|
|
variable_name_for_query = var_param[0]
|
|
parameter = "value" if len(var_param) == 1 else var_param[1]
|
|
curve = self._get_curve(variable_name_for_query, parameter, time, interval)
|
|
res[variable] = curve
|
|
|
|
return res
|
|
|
|
def poll_last_values(self, variables, lastvalues, end_time):
|
|
"""
|
|
Polls the lastest values for the given variables since their last known point to end_time.
|
|
|
|
Parameters :
|
|
variables ([(str)]) : an array of variable names (Influx) to get the last known values for
|
|
lastvalues ({(str):((float), (float))}) : a dictionnary of tuples, first value being the floating Unix timestamp in seconds (precision = ms) of the last known value for the curve, and the second value being the associated value, indexed by the curve name
|
|
end_time (int) : the Unix timestamp in seconds of the last point in time to include the values in
|
|
|
|
Returns :
|
|
{(str):[[(int), (float)]]} : a dictionnary of points. The key is the name of the influx variable, and the value is an array of pairs (also array), the first value being the Unix timestamp in second (x), the seconds being the value (y).
|
|
"""
|
|
res = {}
|
|
for variable in variables:
|
|
|
|
var_param = variable.split(".")
|
|
variable_name_for_query = var_param[0]
|
|
parameter = "value" if len(var_param) == 1 else var_param[1]
|
|
start_time = int(lastvalues[variable][0]) if variable in lastvalues.keys() else None #if the call to poll_last_values is more recent than getgraph, we trigger only one value, which is the last one available from 0 to end_time
|
|
points = []
|
|
if start_time == None or start_time < end_time: # start_time might be more recent than end_time if getgraph is called between graphpoll and pol_last_values (influxgraph.py)
|
|
points = self._get_last_values(variable_name_for_query,parameter,start_time, end_time)
|
|
if len(points) > 0 :
|
|
res[variable] = points
|
|
return res
|
|
|
|
def get_device_name(self, time):
|
|
"""
|
|
Gets the device name available at time with stick and addons
|
|
|
|
Parameters :
|
|
time (int) : the Unix timestamp in seconds of the time we want the device name
|
|
|
|
Returns :
|
|
str : the device name
|
|
"""
|
|
components = self._get_device_name_components(time)
|
|
return "/".join(components)
|
|
|
|
def get_curves_data_frame(self, variables, times, interval, variables_name_label_map=None):
|
|
"""
|
|
Gets the curves for the given variables within a timerange times, as a pandas dataframe.
|
|
All curves are on a single common time axis.
|
|
The first column is called "relative", and consists of floating seconds, relative to the beginning of the query.
|
|
The "timestamp" column (absolute floating UNIX timestamps in seconds, precise to the nanosecond) is the last one.
|
|
If a curve does not have a point at a given point, the last known value for this curve is used.
|
|
If a curve gets expired, it is filled with NaN value for the corresponding expired time windows.
|
|
The first value (for each variable) is the last known value before the time interval.
|
|
|
|
Parameters :
|
|
variables ([(str)]) : an array of variable names (Influx) to get the curves for.
|
|
times ([int]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
|
|
interval (int) : the interval (resolution) of the values to get (in seconds). Allows data binning.
|
|
variables_name_label_map ({(str):(str)} | None) : a dictionnary containing curve labels, indexed by INFLUX names. The corresponding label will be used in the TSV header for each variable if found, else the influx name.
|
|
Returns :
|
|
pandas.DataFrame : the curves in a single pandas DataFrame
|
|
"""
|
|
variables_info = {}
|
|
for variable in variables:
|
|
var_param = variable.split(".")
|
|
variable_name_for_query = var_param[0]
|
|
parameter = "value" if len(var_param) == 1 else var_param[1]
|
|
|
|
# we need to rename the "_time" column to simply "time" in case we want binning because of the comparison done later in the "binned points with same timestamp" process.
|
|
# chr(34) is the double quote char, because we cannot escape them in a f string
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: {times[0]}, stop: {times[1] + 1})
|
|
|> filter(fn : (r) => r._measurement == "{variable_name_for_query}")
|
|
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
|
|
{"|> aggregateWindow(every: duration(v: "+ str(self._seconds_to_nanoseconds(interval))+"), fn: last, createEmpty:false, timeDst:"+chr(34)+"binning_time"+chr(34)+")" if interval != "None" else ""}
|
|
|> map(fn: (r) => ({{r with relative: ( float(v: uint(v: {"r.binning_time" if interval != "None" else "r._time"}) - uint(v:{self._seconds_to_nanoseconds(times[0])})) / 1000000000.0 )}}))
|
|
|> map(fn: (r) => ({{r with timestamp: float(v: uint(v: {"r.binning_time" if interval != "None" else "r._time"})) / 1000000000.0}}))
|
|
{"|> rename(columns: {_time:"+chr(34)+"time"+chr(34)+"})" if interval != "None" else ""}
|
|
|> drop(columns:["_start", "_stop", "_field"])
|
|
|> pivot(rowKey:["relative", "timestamp", "expired"{", "+chr(34)+"time"+chr(34) if interval != "None" else ""}], columnKey: ["_measurement"], valueColumn: "_value")
|
|
"""
|
|
data_frame = self._db.query_data_frame(query)
|
|
|
|
# If there is a binning asked, there can be two points with the same timestamp/relative value, because points with expired=True and expired=False can be found in the same interval, leading to two rows, one with last True, and one with last False.
|
|
# For each identified couple, we look at the real timestamp of the points used to feed this interval. We then keep the value which is the more recent in this interval.
|
|
|
|
if interval != "None" and not data_frame.empty:
|
|
# we first identify the expired points
|
|
expired_rows = data_frame.loc[data_frame["expired"] == "True"]
|
|
|
|
# we use itertuples to preserve the pandas dtypes, so comparisons can be done. "tuple" is a Python named tuple
|
|
for expired_point_tuple in expired_rows.itertuples():
|
|
# Then, we identify if there is a valid point with the same relative time as the current expired point
|
|
corresponding_valid_point = data_frame.loc[(data_frame["expired"] == "False") & (data_frame["relative"] == expired_point_tuple.relative)]
|
|
# we make sure that there is only one corresponding valid point, even if in theory, there will never be more than one corresponding valid point
|
|
if not corresponding_valid_point.empty and len(corresponding_valid_point.index) == 1:
|
|
# if we did not rename "_time" to "time" sooner in the query, "_time" would have been renamed to a positionnal name (because it starts with a _, see itertuples() doc), making confusion while reading the code
|
|
if corresponding_valid_point.iloc[0]["time"] > expired_point_tuple.time:
|
|
data_frame.drop(expired_point_tuple.Index, inplace=True)
|
|
else:
|
|
data_frame.drop(corresponding_valid_point.index, inplace=True)
|
|
# we do not need the "time" column anymore
|
|
data_frame.drop(["time"], axis=1, inplace=True)
|
|
|
|
# Needed for last known value
|
|
query_last_known = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: 0, stop: {times[0] + 1})
|
|
|> filter(fn : (r) => r._measurement == "{variable_name_for_query}")
|
|
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
|
|
|> last()
|
|
|> map(fn: (r) => ({{r with relative: 0.0}}))
|
|
|> map(fn: (r) => ({{r with timestamp: float(v: uint(v: r._time)) / 1000000000.0}}))
|
|
|> drop(columns:["_start", "_stop", "_field"])
|
|
|> pivot(rowKey:["relative", "timestamp", "expired"], columnKey: ["_measurement"], valueColumn: "_value")
|
|
"""
|
|
|
|
data_frame_last_known = self._db.query_data_frame(query_last_known)
|
|
row_to_insert = None
|
|
for index, row in data_frame_last_known.iterrows():
|
|
try: #needed because row_to_insert == None is not possible
|
|
if row_to_insert.empty or row["timestamp"] > row_to_insert["timestamp"]:
|
|
row_to_insert = row
|
|
except:
|
|
row_to_insert = row
|
|
try: #row_to_insert might be None
|
|
if not row_to_insert.empty :
|
|
row_to_insert["timestamp"] = float(times[0])
|
|
if data_frame.empty:
|
|
data_frame = row_to_insert.to_frame().T
|
|
else:
|
|
data_frame.loc[-1] = row_to_insert
|
|
except:
|
|
pass
|
|
|
|
if data_frame.empty:
|
|
continue
|
|
|
|
variable_df_column_name = variables_name_label_map.get(variable, variable) if not variables_name_label_map == None else variable
|
|
data_frame.rename(columns={variable_name_for_query : variable_df_column_name}, inplace=True)
|
|
data_frame.drop(["result", "table"], axis=1, inplace=True)
|
|
data_frame.sort_values(by=["timestamp"], inplace=True)
|
|
data_frame.reset_index()
|
|
|
|
variables_info[variable_df_column_name] = {}
|
|
variables_info[variable_df_column_name]["expired_ranges"] = []
|
|
|
|
# Identify time windows for which the curve is expired
|
|
|
|
for index, row in data_frame.iterrows():
|
|
if row["expired"] == "True":
|
|
data_frame.loc[index, variable_df_column_name] = NaN
|
|
variables_info[variable_df_column_name]["expired_ranges"].append([row["timestamp"]])
|
|
elif row["expired"] == "False":
|
|
if len(variables_info[variable_df_column_name]["expired_ranges"]) > 0 and len(variables_info[variable_df_column_name]["expired_ranges"][-1]) == 1:
|
|
variables_info[variable_df_column_name]["expired_ranges"][-1].append(row["timestamp"])
|
|
data_frame.reset_index()
|
|
data_frame.drop(["expired"], axis=1, inplace=True)
|
|
variables_info[variable_df_column_name]["df"] = data_frame
|
|
res = None
|
|
non_empty_variables = list(variables_info.keys())
|
|
|
|
# Merge single curve dataframes to a global one
|
|
|
|
if len(non_empty_variables) == 0:
|
|
return df()
|
|
elif len(non_empty_variables) == 1:
|
|
res = variables_info[non_empty_variables[0]]["df"]
|
|
else :
|
|
for i in range(0, len(non_empty_variables)):
|
|
if i == 1:
|
|
res = merge_ordered(variables_info[non_empty_variables[0]]["df"], variables_info[non_empty_variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
|
|
elif i > 1:
|
|
res = merge_ordered(res, variables_info[non_empty_variables[i]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
|
|
|
|
# Forward fill missing values, then set data points to NaN for those which are expired
|
|
if len(non_empty_variables) > 1:
|
|
res.ffill(inplace=True)
|
|
for variable, info in variables_info.items():
|
|
for expired_range in info["expired_ranges"]:
|
|
res.loc[(res["timestamp"] >= expired_range[0]) & ((res["timestamp"] < expired_range[1]) if len(expired_range) == 2 else True), variable] = NaN
|
|
|
|
# Change order of columns
|
|
cols = res.columns.tolist()
|
|
cols = [cols[0]] + cols[2:] + [cols[1]]
|
|
res = res[cols]
|
|
|
|
return res
|
|
|
|
# ----- PRIVATE METHODS
|
|
|
|
def _get_all_setup_info_as_dict(self, time):
|
|
"""
|
|
Gets the value of the field setup_info in the measurements nicos/se_main, nicos/se_stick, nicos/se_addons as an array of Python dicts.
|
|
Takes the last setup_info dict (for each measurement) known at time.
|
|
|
|
Parameters
|
|
time (int) : the unix timestamps in seconds of the point in time to get the variables at.
|
|
|
|
Returns :
|
|
[{(str):((str), {...})}]: an array of the parsed "setup_info dict" of each measurements. The key is the secop_module prefixed with "se_", and the value is a tuple with its first value
|
|
being the type of Secop device for this module, and the value is too big to give its signature. Some tuple examples can be found under graphs/setup_info_examples.
|
|
|
|
"""
|
|
measurements = ["nicos/se_main", "nicos/se_stick", "nicos/se_addons"]
|
|
res = []
|
|
for measurement in measurements:
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: 0, stop: {time + 1})
|
|
|> filter(fn: (r) => r._measurement == "{measurement}")
|
|
|> filter(fn: (r) => r._field == "setup_info")
|
|
|> last()
|
|
|> yield(name: "res")
|
|
"""
|
|
tables = self._db.query(query)
|
|
for table in tables:
|
|
for record in table.records:
|
|
res.append(ast.literal_eval(record.get_value()))
|
|
return res
|
|
|
|
def _extract_variables(self, all_setup_info_dict):
|
|
"""
|
|
Extracts relevant information out of the setup_info dict for each available variable in measurements nicos/se_main, nicos/se_stick, nicos/se_addons
|
|
|
|
Parameters :
|
|
all_setup_info_dict ([{(str):((str), {...})}]) : an array of the parsed "setup_info dict" of each measurements. The key is the secop_module prefixed with "se_", and the value is a tuple with its first value
|
|
being the type of Secop device for this module, and the value is too big to give its signature. Some tuple examples can be found under graphs/setup_info_examples.
|
|
|
|
Returns :
|
|
[{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}] : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, and a dictionnary of parameters (including value), which consist of dictionnares with the category ("*" for value and target, "-" else), the color (empty for the moment)
|
|
and the unit ("1" if not available or empty), indexed by the name of the parameter.
|
|
|
|
"""
|
|
available_varirables = []
|
|
added_names = []
|
|
for setup_info_dict in all_setup_info_dict:
|
|
for (_, content) in setup_info_dict.items():
|
|
if content[0] != "nicos.devices.secop.devices.SecopDevice":
|
|
name = self._transform_secop_module_name_to_influx(content[1]["secop_module"])
|
|
if name not in added_names:
|
|
value_unit = "1" if (not "unit" in content[1].keys() or content[1]["unit"] == "") else content[1]["unit"]
|
|
variable = {
|
|
"name":name,
|
|
"label":content[1]["secop_module"],
|
|
"params":{"value":{"cat":"*", "color":"", "unit":value_unit}} # main value, shown by default
|
|
}
|
|
|
|
for param_name, param_content in content[1]["params_cfg"].items():
|
|
param_unit = "1" if (not "unit" in param_content.keys() or param_content["unit"] == "") else param_content["unit"]
|
|
variable["params"][param_name] = {
|
|
"cat":"*" if param_name == "target" else "-", # target is also shown by default, not the other parameters
|
|
"color":"",
|
|
"unit":param_unit
|
|
}
|
|
available_varirables.append(variable)
|
|
added_names.append(name)
|
|
return available_varirables
|
|
|
|
def _transform_secop_module_name_to_influx(self, secop_module_name):
|
|
"""
|
|
Transforms the name of the variable available in the setup_info dict into the Influx name.
|
|
Lowers the secop_module_name and adds "nicos/se_" as prefix
|
|
|
|
Parameters :
|
|
secop_module_name (str) : the secop module name of the variable in the setup_info dict.
|
|
|
|
Returns :
|
|
str : the transformed variable name that matches the Influx names reqauirements
|
|
"""
|
|
return MEASURMENT_PREFIX + secop_module_name.lower()
|
|
|
|
def _filter_params_with_config(self, available_variables, chart_config):
|
|
"""
|
|
Updates (cat, color, unit) the parameters of each variable according to the user_config object.
|
|
|
|
Parameters:
|
|
available_variables ([{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, and a dictionnary of parameters (including value), which consist of dictionnaries with the category, the color and the unit, indexed by the name of the parameter.
|
|
chart_config (ChartConfig) : the object holding a configuration file for the chart.
|
|
|
|
Returns :
|
|
[{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}] : the available_variables parameter, updated
|
|
"""
|
|
|
|
for variable in available_variables:
|
|
params = list(variable["params"].keys())
|
|
for param_key in params:
|
|
key = variable["label"] if param_key == "value" else variable["label"]+"."+param_key
|
|
param_config = chart_config.get_variable_parameter_config(key)
|
|
if param_config != None :
|
|
for key, value in param_config.items():
|
|
variable["params"][param_key][key] = value
|
|
|
|
return available_variables
|
|
|
|
def _filter_params_with_user_config(self, available_variables, user_config):
|
|
"""
|
|
Updates (cat, color, unit) the parameters of each variable according to the user_config object.
|
|
|
|
Parameters:
|
|
available_variables ([{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, and a dictionnary of parameters (including value), which consist of dictionnaries with the category, the color and the unit, indexed by the name of the parameter.
|
|
user_config ({(str):{"cat":(str), "color":(str), "unit":(str)}}) : the Python dict representing the user configuration. The key is <secop_module.parameter>.
|
|
|
|
Returns :
|
|
[{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}] : the available_variables parameter, updated
|
|
"""
|
|
|
|
for variable in available_variables:
|
|
params = list(variable["params"].keys())
|
|
for param_key in params:
|
|
key = variable["label"] if param_key == "value" else variable["label"]+"."+param_key
|
|
param_config = user_config[key] if key in user_config.keys() else None
|
|
if param_config != None :
|
|
for key, value in param_config.items():
|
|
variable["params"][param_key][key] = value
|
|
|
|
return available_variables
|
|
|
|
def _remove_variables_params_not_displayed(self, available_variables):
|
|
"""
|
|
Removes the parameters of each variable if their category is "-".
|
|
|
|
Parameters:
|
|
available_variables ([{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, and a dictionnary of parameters (including value), which consist of dictionnaries with the category, the color and the unit, indexed by the name of the parameter.
|
|
|
|
Returns :
|
|
[{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}] : the available_variables parameter, updated
|
|
"""
|
|
for variable in available_variables:
|
|
params = list(variable["params"].keys())
|
|
for param_key in params:
|
|
if variable["params"][param_key]["cat"] == "-":
|
|
del variable["params"][param_key]
|
|
|
|
return available_variables
|
|
|
|
def _remove_variables_params_wihout_param_float_and_split(self, available_variables, time):
|
|
"""
|
|
For each variable, removes the parameters if the Influx database does not contain <param>.float field, and split the parameters to the corresponding output format.
|
|
|
|
Parameters:
|
|
available_variables ([{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, and a dictionnary of parameters (including value), which consist of dictionnares with the category, the color and the unit, indexed by the name of the parameter.
|
|
time (int) : the unix timestamp in seconds of the point in time to get the variables at. Used to have an upper limit in the query.
|
|
|
|
|
|
Returns :
|
|
[{"name":(str), "label":(str), "cat":(str), "color":(str), "unit":(str)}] : an array of dictionnaries, each containing the name of the variable[.<param>],
|
|
the label to display in the Web GUI, its category, its color and its unit.
|
|
"""
|
|
res = []
|
|
for variable in available_variables:
|
|
query = f"""
|
|
import "influxdata/influxdb/schema"
|
|
schema.measurementFieldKeys(bucket: "{self._bucket}", measurement: "{variable["name"]}", start:0, stop: {time + 1})
|
|
|> yield(name: "res")
|
|
"""
|
|
records = self._db.query(query)[0].records
|
|
fields = [record.get_value() for record in records]
|
|
for param in variable["params"].keys():
|
|
if param+"_float" in fields:
|
|
res.append({
|
|
"name": variable["name"] if param == "value" else variable["name"]+"."+param,
|
|
"label": variable["label"] if param == "value" else variable["label"]+"."+param,
|
|
"cat": variable["params"][param]["cat"],
|
|
"color": variable["params"][param]["color"],
|
|
"unit": variable["params"][param]["unit"]
|
|
})
|
|
|
|
return res
|
|
|
|
def _group_variables_by_cat_unit(self, available_variables):
|
|
"""
|
|
Performs a group by cat if specified (different than "*"), or by unit instead for the available variables
|
|
|
|
Parameters :
|
|
available_variables ([{"name":(str), "label":(str), "cat":(str), "color":(str), "unit":(str)}]) : an array of dictionnaries, each containing the name of the variable[.<param>],
|
|
the label to display in the Web GUI, its category, its color and its unit.
|
|
|
|
Returns :
|
|
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]] : a list of dictionnaries, each one representing
|
|
a block of curves with their name, their label and their color to display, grouped by their tag, which is the unit or the category if given and not "unit".
|
|
"""
|
|
groups = {}
|
|
|
|
for available_variable in available_variables:
|
|
key = available_variable["unit"]
|
|
if available_variable["cat"] != "*":
|
|
key = available_variable["cat"]
|
|
if key not in groups.keys():
|
|
groups[key] = {"tag":key, "unit":available_variable["unit"], "curves":[]}
|
|
groups[key]["curves"].append({
|
|
"name":available_variable["name"],
|
|
"label":available_variable["label"],
|
|
"color":available_variable["color"],
|
|
})
|
|
|
|
return list(groups.values())
|
|
|
|
def _get_curve(self, variable, parameter, time, interval=None):
|
|
"""
|
|
Gets the points (curve) within a timerange for the given variable and parameter.
|
|
|
|
Parameters :
|
|
variable (str) : the name (Influx) of the variable we want the values of.
|
|
parameter (str) : the parameter of the variable to get the values from
|
|
time ([(int)]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
|
|
interval (int) : the interval (resolution) of the values to get (in milliseconds)
|
|
|
|
Returns :
|
|
[[(int), (float)]] : an array of pairs (also arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y)
|
|
"""
|
|
raw = []
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: {time[0]}, stop: {time[1] + 1})
|
|
|> filter(fn : (r) => r._measurement == "{variable}")
|
|
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
|
|
{"|> aggregateWindow(every: duration(v:"+str(self._milliseconds_to_nanoseconds(interval))+"), fn: last, createEmpty:false, timeDst:"+chr(34)+"binning_time"+chr(34)+")" if interval else ""}
|
|
|> keep(columns: ["_time","_value","expired"{", "+chr(34)+"binning_time"+chr(34) if interval else ""}])
|
|
|> yield(name: "res")
|
|
"""
|
|
tables = self._db.query(query)
|
|
for table in tables:
|
|
for record in table.records:
|
|
t = round(datetime.timestamp(record["binning_time"] if interval else record.get_time()), 3) # t is the real timestamp if no interval is given, or the binned timestamp if interval
|
|
value = record.get_value()
|
|
try:
|
|
value = PrettyFloat(value)
|
|
except:
|
|
value = None
|
|
point = [t, value, record["expired"]]
|
|
if interval: # t is the binning time, we need to add the real time of the point that was used in this interval
|
|
point.append(record.get_time())
|
|
raw.append(point)
|
|
|
|
if interval:
|
|
indexes_to_delete = []
|
|
expired_points = {i:point for i,point in enumerate(raw) if point[2] == "True"} #we need to keep track of the indexes of the expired point in the list
|
|
for expired_point_index, expired_point in expired_points.items():
|
|
for i, point in enumerate(raw):
|
|
if point[2] == "False" and expired_point[0] == point[0]: # if the current point is expired and has the same binning time as the current expired point
|
|
if point[3] > expired_point[3]: # comparison on the real timestamp used.
|
|
indexes_to_delete.insert(0, expired_point_index)
|
|
else:
|
|
indexes_to_delete.insert(0,i)
|
|
sorted(indexes_to_delete, reverse=True) #we have to make sure that the list is sorted in reverse to then delete at the given indexes
|
|
for index in indexes_to_delete:
|
|
del raw[index]
|
|
|
|
sorted_raw = sorted(raw, key=lambda pair: pair[0]) #expired=True and expired=False are in two Influx tables, so they need to be synchronized
|
|
res = []
|
|
for pair in sorted_raw:
|
|
if pair[2] == "True":
|
|
res.append([pair[0], pair[1]]) # So the user can know precisely when a curve is expired
|
|
res.append([pair[0], None]) # So chartJS will cut the curve from this point (which is expired)
|
|
else:
|
|
res.append([pair[0], pair[1]])
|
|
|
|
return self._insert_last_known_value(variable, parameter, res, time)
|
|
|
|
def _insert_last_known_value(self, variable, parameter, curve, time):
|
|
"""
|
|
Adds the last known value as the first point in the curve if the last known value is outside the viewing window, for the given variable and parameter.
|
|
The point is added only if it is not expired.
|
|
|
|
Parameters :
|
|
variable (str) : the name (Influx) of the variable we want the values of.
|
|
parameter (str) : the parameter of the variable to get the values from
|
|
curve ([[(int), (float)]]) : an array of pairs (arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y)
|
|
time ([(int)]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
|
|
|
|
Returns :
|
|
[[(int), (float)]] : the curve of the parameter, updated with a potential new first point
|
|
"""
|
|
|
|
if len(curve) == 0 or curve[0][0] != time[0]:
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: 0, stop: {time[0]+1})
|
|
|> filter(fn : (r) => r._measurement == "{variable}")
|
|
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
|
|
|> last()
|
|
|> keep(columns: ["_time", "_value", "expired"])
|
|
|> yield(name: "res")
|
|
"""
|
|
tables = self._db.query(query)
|
|
pair_to_insert = []
|
|
for table in tables:
|
|
for record in table.records:
|
|
t = round(datetime.timestamp(record.get_time()), 3)
|
|
value = None
|
|
if record["expired"] == "False":
|
|
value = record.get_value()
|
|
try:
|
|
value = PrettyFloat(value)
|
|
except:
|
|
value = None
|
|
if len(pair_to_insert) == 0 or t >= pair_to_insert[0]:
|
|
pair_to_insert = [t, value]
|
|
|
|
if len(pair_to_insert)==2 and pair_to_insert[1] != None:
|
|
curve.insert(0, [time[0], pair_to_insert[1]])
|
|
return curve
|
|
|
|
def _get_last_values(self, variable, parameter, start_time, end_time):
|
|
"""
|
|
Gets the lastest values for the given variable and parameter that are in [start_time, end_time].
|
|
The process is the same as _get_curve.
|
|
|
|
Parameters :
|
|
variable (str) : the name (Influx) of the variable we want the last value of.
|
|
parameter (str) : the parameter of the variable to get the values from
|
|
start_time (int|None) : the start of time range (Unix timestamp in seconds) to include the values in
|
|
end_time (int) : the end of time range (Unix timestamp in seconds) to include the values in
|
|
|
|
Returns :
|
|
[[(int), (float)]] : an array of points (also arrays). The first value is the Unix timestamp in second (x), the seconds is the value (y)
|
|
"""
|
|
|
|
raw = []
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: {start_time if start_time != None else 0}, stop: {end_time+1})
|
|
|> filter(fn : (r) => r._measurement == "{variable}")
|
|
|> filter(fn : (r) => r._field == "{parameter+ "_float"}")
|
|
{"|> last()" if start_time == None else ""}
|
|
|> keep(columns: ["_time","_value", "expired"])
|
|
|> yield(name: "res")
|
|
"""
|
|
|
|
# this loop might be simplified, but it has to be kept to catch the case when there is unavailable data
|
|
tables = self._db.query(query)
|
|
for table in tables:
|
|
for record in table.records:
|
|
t = round(datetime.timestamp(record.get_time()), 3)
|
|
value = record.get_value()
|
|
try:
|
|
value = PrettyFloat(value)
|
|
except:
|
|
value = None
|
|
raw.append([t, value, record["expired"]])
|
|
|
|
sorted_raw = sorted(raw, key=lambda pair: pair[0])
|
|
res = []
|
|
for pair in sorted_raw:
|
|
if pair[2] == "True":
|
|
res.append([pair[0], pair[1]])
|
|
res.append([pair[0], None])
|
|
else:
|
|
res.append([pair[0], pair[1]])
|
|
|
|
return res
|
|
|
|
def _get_device_name_components(self, time):
|
|
"""
|
|
Gets the components of the device name, first in the main name, then stick, then addons.
|
|
|
|
Parameters :
|
|
time (int) : the Unix timestamp in seconds of the time we want the device name
|
|
|
|
Returns :
|
|
[str] : an array of string, each one being a component of the device name
|
|
"""
|
|
measurements = ["nicos/se_main", "nicos/se_stick", "nicos/se_addons"]
|
|
res = []
|
|
for measurement in measurements:
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: 0, stop: {time + 1})
|
|
|> filter(fn: (r) => r._measurement == "{measurement}")
|
|
|> filter(fn: (r) => r._field == "value")
|
|
|> last()
|
|
"""
|
|
tables = self._db.query(query)
|
|
for table in tables:
|
|
for record in table.records:
|
|
name = ast.literal_eval(record.get_value())
|
|
if name != None and name != '':
|
|
res.append(ast.literal_eval(record.get_value()))
|
|
return res
|
|
|
|
def _seconds_to_nanoseconds(self, seconds):
|
|
return seconds * 1000000000
|
|
|
|
def _milliseconds_to_nanoseconds(self, milliseconds):
|
|
return milliseconds * 1000000 |