597 lines
30 KiB
Python
597 lines
30 KiB
Python
from influxdb_client import InfluxDBClient
|
|
from configparser import ConfigParser
|
|
from chart_config import ChartConfig
|
|
import ast
|
|
from datetime import datetime
|
|
from pandas import DataFrame as df, merge_ordered
|
|
from numpy import NaN
|
|
|
|
class InfluxDB:
|
|
"""
|
|
Class used to handle the connection with the InfluxDB instance
|
|
"""
|
|
def __init__(self):
|
|
config = ConfigParser()
|
|
config.read("./config/config.ini")
|
|
self._client = InfluxDBClient(url=config["INFLUX"]["url"], token=config["INFLUX"]["token"],
|
|
org=config["INFLUX"]["org"])
|
|
|
|
def disconnet(self):
|
|
"""
|
|
Disconnects from the InfluxDB instance
|
|
"""
|
|
self._client.close()
|
|
|
|
def query(self, query_str):
|
|
"""
|
|
Executes the query on the InfluxDB instance
|
|
|
|
Parameters :
|
|
query_str (string) : the Flux query to execute
|
|
|
|
Returns :
|
|
TableList : an InfluxDB list of the tables returned by the query
|
|
"""
|
|
return self._client.query_api().query(query_str)
|
|
|
|
def query_data_frame(self, query_str):
|
|
return self._client.query_api().query_data_frame(query_str)
|
|
|
|
class PrettyFloat(float):
|
|
"""saves bandwidth when converting to JSON
|
|
|
|
a lot of numbers originally have a fixed (low) number of decimal digits
|
|
as the binary representation is not exact, it might happen, that a
|
|
lot of superfluous digits are transmitted:
|
|
|
|
str(1/10*3) == '0.30000000000000004'
|
|
str(PrettyFloat(1/10*3)) == '0.3'
|
|
"""
|
|
def __repr__(self):
|
|
return '%.15g' % self
|
|
|
|
class InfluxDataGetter:
|
|
def __init__(self, db, influx_instrument_config):
|
|
self._influx_instrument_config = influx_instrument_config
|
|
self._bucket = self._influx_instrument_config["bucket"]
|
|
self._db = db
|
|
|
|
# ----- PUBLIC METHODS
|
|
|
|
def get_available_variables_at_time(self, times):
|
|
"""
|
|
Gets the available variables (those that we can have a value for since the device has been installed on the instrument) at the given point in time.
|
|
We can get the last available variables at the given point in time or all the known variables for the day corresponding to the timestamp.
|
|
|
|
Parameters :
|
|
times ([int]) : the unix timestamps in seconds of the range. The first value can be unused. The last can represent the point in time.
|
|
|
|
Returns :
|
|
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]}] : a list of dictionnaries, each one representing
|
|
a block of curves with their name, their label and their color to display, grouped by their tag (which can be the unit augmented with an index) and their unit.
|
|
"""
|
|
|
|
all_setup_info = self._get_all_setup_info_as_dict(times)
|
|
|
|
available_variables = self._extract_variables(all_setup_info)
|
|
available_variables = self._filter_params_with_config(available_variables)
|
|
available_variables = self._remove_variables_params_wihout_param_float_and_split(available_variables, times)
|
|
res = self._group_variables_by_cat_unit(available_variables)
|
|
|
|
return res
|
|
|
|
def get_curves_in_timerange(self, variables, time, interval = None):
|
|
"""
|
|
Gets the curves for the given variables within a timerange.
|
|
|
|
Parameters :
|
|
variables ([(str)]) : an array of variable names (Influx) to get the curves for
|
|
time ([int]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
|
|
interval (int) : the interval (resolution) of the values to get (in nanoseconds)
|
|
|
|
Returns :
|
|
{(str):[[(int), (float)]]} : a dictionnary of curves. The key is the name of the influx variable, and the value is an array of pairs (also arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y).
|
|
"""
|
|
res = {}
|
|
for variable in variables:
|
|
var_param = variable.split(".")
|
|
variable_name_for_query = var_param[0]
|
|
parameter = "value" if len(var_param) == 1 else var_param[1]
|
|
curve = self._get_curve(variable_name_for_query, parameter, time, interval)
|
|
# if len(curve) > 0:
|
|
res[variable] = curve
|
|
|
|
return res
|
|
|
|
def poll_last_values(self, variables, lastvalues, end_time):
|
|
"""
|
|
Polls the lastest values for the given variables since their last known point to end_time.
|
|
|
|
Parameters :
|
|
variables ([(str)]) : an array of variable names (Influx) to get the last known values for
|
|
lastvalues ({(str):((float), (float))}) : a dictionnary of tuples, first value being the floating Unix timestamp in seconds (precision = ms) of the last known value for the curve, and the second value being the associated value, indexed by the curve name
|
|
end_time (int) : the Unix timestamp in seconds of the last point in time to include the values in
|
|
|
|
Returns :
|
|
{(str):[[(int), (float)]]} : a dictionnary of points. The key is the name of the influx variable, and the value is an array of pairs (also array), the first value being the Unix timestamp in second (x), the seconds being the value (y).
|
|
"""
|
|
res = {}
|
|
for variable in variables:
|
|
|
|
var_param = variable.split(".")
|
|
variable_name_for_query = var_param[0]
|
|
parameter = "value" if len(var_param) == 1 else var_param[1]
|
|
start_time = int(lastvalues[variable][0]) if variable in lastvalues.keys() else None #if the call to poll_last_values is more recent than getgraph, we trigger only one value, which is the last one available from 0 to end_time
|
|
points = []
|
|
if start_time == None or start_time < end_time: # start_time might be more recent than end_time if getgraph is called between graphpoll and pol_last_values (influxgraph.py)
|
|
points = self._get_last_values(variable_name_for_query,parameter,start_time, end_time)
|
|
if len(points) > 0 :
|
|
res[variable] = points
|
|
return res
|
|
|
|
def get_device_name(self, time):
|
|
"""
|
|
Gets the device name available at time with stick and addons
|
|
|
|
Parameters :
|
|
time (int) : the Unix timestamp in seconds of the time we want the device name
|
|
|
|
Returns :
|
|
str : the device name
|
|
"""
|
|
components = self._get_device_name_components(time)
|
|
return "/".join(components)
|
|
|
|
def get_curves_data_frame(self, variables, times, interval):
|
|
"""
|
|
Gets the curves for the given variables within a timerange times, as a pandas dataframe.
|
|
All curves are on a single common time axis.
|
|
The first column is called "relative", and consists of floating seconds, relative to the beginning of the query.
|
|
The "timestamp" column (absolute floating UNIX timestamps in seconds, precise to the nanosecond) is the last one.
|
|
If a curve does not have a point at a given point, the last known value for this curve is used.
|
|
If a curve gets expired, it is filled with NaN value for the corresponding expired time windows.
|
|
The first value (for each variable) is the last known value before the time interval.
|
|
|
|
Parameters :
|
|
variables ([(str)]) : an array of variable names (Influx) to get the curves for.
|
|
times ([int]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
|
|
interval (int) : the interval (resolution) of the values to get (in nanoseconds). Allows data binning.
|
|
|
|
Returns :
|
|
pandas.DataFrame : the curves in a single pandas DataFrame
|
|
"""
|
|
variables_info = {}
|
|
for variable in variables:
|
|
var_param = variable.split(".")
|
|
variable_name_for_query = var_param[0]
|
|
parameter = "value" if len(var_param) == 1 else var_param[1]
|
|
|
|
variables_info[variable] = {}
|
|
variables_info[variable]["expired_ranges"] = []
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: {times[0]}, stop: {times[1] + 1})
|
|
|> filter(fn : (r) => r._measurement == "{variable_name_for_query}")
|
|
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
|
|
{"|> aggregateWindow(every: duration(v: "+ str(self._seconds_to_nanoseconds(interval))+"), fn: last, createEmpty:false)" if interval != "None" else ""}
|
|
|> map(fn: (r) => ({{r with relative: ( float(v: uint(v: r._time) - uint(v:{self._seconds_to_nanoseconds(times[0])})) / 1000000000.0 )}}))
|
|
|> map(fn: (r) => ({{r with timestamp: float(v: uint(v: r._time)) / 1000000000.0}}))
|
|
|> drop(columns:["_start", "_stop", "_field"])
|
|
|> pivot(rowKey:["relative", "timestamp", "expired"], columnKey: ["_measurement"], valueColumn: "_value")
|
|
"""
|
|
data_frame = self._db.query_data_frame(query)
|
|
|
|
# Needed for last known value
|
|
query_last_known = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: 0, stop: {times[0] + 1})
|
|
|> filter(fn : (r) => r._measurement == "{variable_name_for_query}")
|
|
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
|
|
|> last()
|
|
|> map(fn: (r) => ({{r with relative: 0.0}}))
|
|
|> map(fn: (r) => ({{r with timestamp: float(v: uint(v: r._time)) / 1000000000.0}}))
|
|
|> drop(columns:["_start", "_stop", "_field"])
|
|
|> pivot(rowKey:["relative", "timestamp", "expired"], columnKey: ["_measurement"], valueColumn: "_value")
|
|
"""
|
|
|
|
data_frame_last_known = self._db.query_data_frame(query_last_known)
|
|
row_to_insert = None
|
|
for index, row in data_frame_last_known.iterrows():
|
|
try: #needed because row_to_insert == None is not possible
|
|
if row_to_insert.empty or row["timestamp"] > row_to_insert["timestamp"]:
|
|
row_to_insert = row
|
|
except:
|
|
row_to_insert = row
|
|
try:
|
|
if not row_to_insert.empty :
|
|
row_to_insert["timestamp"] = float(times[0])
|
|
data_frame.loc[-1] = row_to_insert
|
|
except:
|
|
pass
|
|
data_frame.drop(["result", "table"], axis=1, inplace=True)
|
|
data_frame.sort_values(by=["timestamp"], inplace=True)
|
|
data_frame.reset_index()
|
|
|
|
# Identify time windows for which the curve is expired
|
|
|
|
for index, row in data_frame.iterrows():
|
|
if row["expired"] == "True":
|
|
data_frame.loc[index, variable] = NaN
|
|
variables_info[variable]["expired_ranges"].append([row["timestamp"]])
|
|
elif row["expired"] == "False":
|
|
if len(variables_info[variable]["expired_ranges"]) > 0:
|
|
variables_info[variable]["expired_ranges"][-1].append(row["timestamp"])
|
|
data_frame.reset_index()
|
|
data_frame.drop(["expired"], axis=1, inplace=True)
|
|
variables_info[variable]["df"] = data_frame
|
|
res = None
|
|
|
|
# Merge single curve dataframes to a global one
|
|
if len(variables) == 1:
|
|
res = variables_info[variables[0]]["df"]
|
|
elif len(variables) == 2:
|
|
res = merge_ordered(variables_info[variables[0]]["df"], variables_info[variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
|
|
else :
|
|
for i,variable in enumerate(variables):
|
|
if i == 1:
|
|
res = merge_ordered(variables_info[variables[0]]["df"], variables_info[variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
|
|
elif i > 1:
|
|
res = merge_ordered(res, variables_info[variables[i]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
|
|
|
|
# Forward fill missing values, then set data points to NaN for those which are expired
|
|
if len(variables) > 1:
|
|
res.ffill(inplace=True)
|
|
for variable, info in variables_info.items():
|
|
for range in info["expired_ranges"]:
|
|
res.loc[(res["timestamp"] >= range[0]) & ((res["timestamp"] < range[1]) if len(range) == 2 else True), variable] = NaN
|
|
|
|
# Change order of columns
|
|
cols = res.columns.tolist()
|
|
cols = [cols[0]] + cols[2:] + [cols[1]]
|
|
res = res[cols]
|
|
|
|
return res
|
|
|
|
# ----- PRIVATE METHODS
|
|
|
|
def _get_all_setup_info_as_dict(self, times):
|
|
"""
|
|
Gets the value of the field setup_info in the measurements nicos/se_main, nicos/se_stick, nicos/se_addons as an array of Python dicts.
|
|
Takes the last setup_info dict (for each measurement) known at times[1], or all the dicts for this day + the previous known (also for each)
|
|
|
|
Parameters
|
|
times ([int]) : the unix timestamps in seconds of the range. The first value can be unused. The last can represent the point in time.
|
|
|
|
Returns :
|
|
[{(str):((str), {...})}]: an array of the parsed "setup_info dict" of each measurements. The key is the secop_module prefixed with "se_", and the value is a tuple with its first value
|
|
being the type of Secop device for this module, and the value is too big to give its signature. Some tuple examples can be found under graphs/setup_info_examples.
|
|
|
|
"""
|
|
measurements = ["nicos/se_main", "nicos/se_stick", "nicos/se_addons"]
|
|
res = []
|
|
for measurement in measurements:
|
|
to_add = []
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: 0, stop: {times[1] + 1})
|
|
|> filter(fn: (r) => r._measurement == "{measurement}")
|
|
|> filter(fn: (r) => r._field == "setup_info")
|
|
|> last()
|
|
|> yield(name: "res")
|
|
"""
|
|
tables = self._db.query(query)
|
|
for table in tables:
|
|
for record in table.records:
|
|
to_add.append(ast.literal_eval(record.get_value()))
|
|
|
|
res.extend(to_add)
|
|
return res
|
|
|
|
def _extract_variables(self, all_setup_info_dict):
|
|
"""
|
|
Extracts relevant information out of the setup_info dict for each available variable in measurements nicos/se_main, nicos/se_stick, nicos/se_addons
|
|
|
|
Parameters :
|
|
all_setup_info_dict ([{(str):((str), {...})}]) : an array of the parsed "setup_info dict" of each measurements. The key is the secop_module prefixed with "se_", and the value is a tuple with its first value
|
|
being the type of Secop device for this module, and the value is too big to give its signature. Some tuple examples can be found under graphs/setup_info_examples.
|
|
|
|
Returns :
|
|
[{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}] : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, and a dictionnary of parameters (including value), which consist of dictionnares with the category (empty for the moment), the color (same) and the unit (if available), indexed by the name of the parameter.
|
|
|
|
"""
|
|
available_varirables = []
|
|
added_names = []
|
|
for setup_info_dict in all_setup_info_dict:
|
|
for (_, content) in setup_info_dict.items():
|
|
if content[0] != "nicos.devices.secop.devices.SecopDevice":
|
|
name = self._transform_secop_module_name_to_influx(content[1]["secop_module"])
|
|
if name not in added_names:
|
|
|
|
variable = {
|
|
"name":name,
|
|
"label":content[1]["secop_module"],
|
|
"params":{"value":{"cat":"", "color":"", "unit":content[1]["unit"]}} # main value
|
|
}
|
|
|
|
for param_name, param_content in content[1]["params_cfg"].items():
|
|
variable["params"][param_name] = {
|
|
"cat":"",
|
|
"color":"",
|
|
"unit":param_content["unit"] if "unit" in param_content.keys() else "" #unit might not be there
|
|
}
|
|
available_varirables.append(variable)
|
|
added_names.append(name)
|
|
return available_varirables
|
|
|
|
def _transform_secop_module_name_to_influx(self, secop_module_name):
|
|
"""
|
|
Transforms the name of the variable available in the setup_info dict into the Influx name.
|
|
Lowers the secop_module_name and adds "nicos/se_" as prefix
|
|
|
|
Parameters :
|
|
secop_module_name (str) : the secop module name of the variable in the setup_info dict.
|
|
|
|
Returns :
|
|
str : the transformed variable name that matches the Influx names reqauirements
|
|
"""
|
|
return self._influx_instrument_config["measurement_prefix"] + secop_module_name.lower()
|
|
|
|
def _filter_params_with_config(self, available_variables):
|
|
"""
|
|
Removes and updates (cat, color, unit) the parameters according to the content of variables_config.ini file.
|
|
|
|
Parameters:
|
|
available_variables ([{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, and a dictionnary of parameters (including value), which consist of dictionnares with the category (empty for the moment), the color (same) and the unit (if available), indexed by the name of the parameter.
|
|
|
|
Returns :
|
|
[{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}] : the available_variables parameter, updated
|
|
"""
|
|
|
|
chart_config = ChartConfig()
|
|
|
|
for variable in available_variables:
|
|
params = list(variable["params"].keys())
|
|
for param_key in params:
|
|
key = variable["label"] if param_key == "value" else variable["label"]+"."+param_key
|
|
param_config = chart_config.get_variable_parameter_config(key)
|
|
if param_config == None : # no entries were found
|
|
if param_key != "target" and param_key != "value": # and the current param is not value or target which are displayed by default
|
|
del variable["params"][param_key]
|
|
else:
|
|
if "cat" in param_config.keys() and param_config["cat"] == "None": # cat might not have been indicated
|
|
del variable["params"][param_key]
|
|
else:
|
|
for key, value in param_config.items():
|
|
variable["params"][param_key][key] = value
|
|
return available_variables
|
|
|
|
def _remove_variables_params_wihout_param_float_and_split(self, available_variables, times):
|
|
"""
|
|
For each variable, removes the parameters if the Influx database does not contain <param>.float field, and split the parameters to the corresponding output format.
|
|
|
|
Parameters:
|
|
available_variables ([{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
|
|
the label to display in the Web GUI, and a dictionnary of parameters (including value), which consist of dictionnares with the category, the color and the unit, indexed by the name of the parameter.
|
|
times ([int]): (only second value used) the unix timestamps in seconds of the range at which we want to get the available variables.
|
|
|
|
Returns :
|
|
[{"name":(str), "label":(str), "cat":(str), "color":(str), "unit":(str)}] : an array of dictionnaries, each containing the name of the variable[.<param>],
|
|
the label to display in the Web GUI, its category, its color and its unit.
|
|
"""
|
|
res = []
|
|
for variable in available_variables:
|
|
query = f"""
|
|
import "influxdata/influxdb/schema"
|
|
schema.measurementFieldKeys(bucket: "{self._bucket}", measurement: "{variable["name"]}", start:0, stop: {times[1] + 1})
|
|
|> yield(name: "res")
|
|
"""
|
|
records = self._db.query(query)[0].records
|
|
fields = [record.get_value() for record in records]
|
|
for param in variable["params"].keys():
|
|
if param+"_float" in fields:
|
|
res.append({
|
|
"name": variable["name"] if param == "value" else variable["name"]+"."+param,
|
|
"label": variable["label"] if param == "value" else variable["label"]+"."+param,
|
|
"cat": variable["params"][param]["cat"],
|
|
"color": variable["params"][param]["color"],
|
|
"unit": variable["params"][param]["unit"]
|
|
})
|
|
|
|
return res
|
|
|
|
def _group_variables_by_cat_unit(self, available_variables):
|
|
"""
|
|
Performs a group by cat and unit for the available variables
|
|
|
|
Parameters :
|
|
available_variables ([{"name":(str), "label":(str), "cat":(str), "color":(str), "unit":(str)}]) : an array of dictionnaries, each containing the name of the variable[.<param>],
|
|
the label to display in the Web GUI, its category, its color and its unit.
|
|
|
|
Returns :
|
|
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]] : a list of dictionnaries, each one representing
|
|
a block of curves with their name, their label and their color to display, grouped by their category and unit (in tag)
|
|
"""
|
|
groups = {}
|
|
|
|
for available_variable in available_variables:
|
|
|
|
key = available_variable["cat"]+"_"+available_variable["unit"] if available_variable["cat"] != "" else available_variable["unit"]
|
|
if key not in groups.keys():
|
|
groups[key] = {"tag":key, "unit":available_variable["unit"], "curves":[]}
|
|
groups[key]["curves"].append({
|
|
"name":available_variable["name"],
|
|
"label":available_variable["label"],
|
|
"color":available_variable["color"],
|
|
})
|
|
|
|
return list(groups.values())
|
|
|
|
def _get_curve(self, variable, parameter, time, interval=None):
|
|
"""
|
|
Gets the points (curve) within a timerange for the given variable and parameter.
|
|
|
|
Parameters :
|
|
variable (str) : the name (Influx) of the variable we want the values of.
|
|
parameter (str) : the parameter of the variable to get the values from
|
|
time ([(int)]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
|
|
interval (int) : the interval (resolution) of the values to get (in nanoseconds)
|
|
|
|
Returns :
|
|
[[(int), (float)]] : an array of pairs (also arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y)
|
|
"""
|
|
raw = []
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: {time[0]}, stop: {time[1] + 1})
|
|
|> filter(fn : (r) => r._measurement == "{variable}")
|
|
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
|
|
{"|> aggregateWindow(every: duration(v:"+str(interval)+"), fn: last, createEmpty:false)" if interval else ""}
|
|
|> keep(columns: ["_time","_value","expired"])
|
|
|> yield(name: "res")
|
|
"""
|
|
tables = self._db.query(query)
|
|
for table in tables:
|
|
for record in table.records:
|
|
t = round(datetime.timestamp(record.get_time()), 3)
|
|
value = record.get_value()
|
|
try:
|
|
value = PrettyFloat(value)
|
|
except:
|
|
value = None
|
|
raw.append([t, value, record["expired"]])
|
|
sorted_raw = sorted(raw, key=lambda pair: pair[0]) #expired=True and expired=False are in two Influx tables, so they need to be synchronized
|
|
res = []
|
|
for pair in sorted_raw:
|
|
if pair[2] == "True":
|
|
res.append([pair[0], pair[1]]) # So the user can know precisely when a curve is expired
|
|
res.append([pair[0], None]) # So chartJS will cut the curve from this point (which is expired)
|
|
else:
|
|
res.append([pair[0], pair[1]])
|
|
|
|
return self._insert_last_known_value(variable, parameter, res, time)
|
|
|
|
def _insert_last_known_value(self, variable, parameter, curve, time):
|
|
"""
|
|
Adds the last known value as the first point in the curve if the last known value is outside the viewing window, for the given variable and parameter.
|
|
The point is added only if it is not expired.
|
|
|
|
Parameters :
|
|
variable (str) : the name (Influx) of the variable we want the values of.
|
|
parameter (str) : the parameter of the variable to get the values from
|
|
curve ([[(int), (float)]]) : an array of pairs (arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y)
|
|
time ([(int)]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
|
|
|
|
Returns :
|
|
[[(int), (float)]] : the curve of the parameter, updated with a potential new first point
|
|
"""
|
|
|
|
if len(curve) == 0 or curve[0][0] != time[0]:
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: 0, stop: {time[0]+1})
|
|
|> filter(fn : (r) => r._measurement == "{variable}")
|
|
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
|
|
|> last()
|
|
|> keep(columns: ["_time", "_value", "expired"])
|
|
|> yield(name: "res")
|
|
"""
|
|
tables = self._db.query(query)
|
|
pair_to_insert = []
|
|
for table in tables:
|
|
for record in table.records:
|
|
t = round(datetime.timestamp(record.get_time()), 3)
|
|
value = None
|
|
if record["expired"] == "False":
|
|
value = record.get_value()
|
|
try:
|
|
value = PrettyFloat(value)
|
|
except:
|
|
value = None
|
|
if len(pair_to_insert) == 0 or t >= pair_to_insert[0]:
|
|
pair_to_insert = [t, value]
|
|
|
|
if len(pair_to_insert)==2 and pair_to_insert[1] != None:
|
|
curve.insert(0, [time[0], pair_to_insert[1]])
|
|
return curve
|
|
|
|
def _get_last_values(self, variable, parameter, start_time, end_time):
|
|
"""
|
|
Gets the lastest values for the given variable and parameter that are in [start_time, end_time].
|
|
The process is the same as _get_curve.
|
|
|
|
Parameters :
|
|
variable (str) : the name (Influx) of the variable we want the last value of.
|
|
parameter (str) : the parameter of the variable to get the values from
|
|
start_time (int|None) : the start of time range (Unix timestamp in seconds) to include the values in
|
|
end_time (int) : the end of time range (Unix timestamp in seconds) to include the values in
|
|
|
|
Returns :
|
|
[[(int), (float)]] : an array of points (also arrays). The first value is the Unix timestamp in second (x), the seconds is the value (y)
|
|
"""
|
|
|
|
raw = []
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: {start_time if start_time != None else 0}, stop: {end_time+1})
|
|
|> filter(fn : (r) => r._measurement == "{variable}")
|
|
|> filter(fn : (r) => r._field == "{parameter+ "_float"}")
|
|
{"|> last()" if start_time == None else ""}
|
|
|> keep(columns: ["_time","_value", "expired"])
|
|
|> yield(name: "res")
|
|
"""
|
|
|
|
# this loop might be simplified, but it has to be kept to catch the case when there is unavailable data
|
|
tables = self._db.query(query)
|
|
for table in tables:
|
|
for record in table.records:
|
|
t = round(datetime.timestamp(record.get_time()), 3)
|
|
value = record.get_value()
|
|
try:
|
|
value = PrettyFloat(value)
|
|
except:
|
|
value = None
|
|
raw.append([t, value, record["expired"]])
|
|
|
|
sorted_raw = sorted(raw, key=lambda pair: pair[0])
|
|
res = []
|
|
for pair in sorted_raw:
|
|
if pair[2] == "True":
|
|
res.append([pair[0], pair[1]])
|
|
res.append([pair[0], None])
|
|
else:
|
|
res.append([pair[0], pair[1]])
|
|
|
|
return res
|
|
|
|
def _get_device_name_components(self, time):
|
|
"""
|
|
Gets the components of the device name, first in the main name, then stick, then addons.
|
|
|
|
Parameters :
|
|
time (int) : the Unix timestamp in seconds of the time we want the device name
|
|
|
|
Returns :
|
|
[str] : an array of string, each one being a component of the device name
|
|
"""
|
|
measurements = ["nicos/se_main", "nicos/se_stick", "nicos/se_addons"]
|
|
res = []
|
|
for measurement in measurements:
|
|
query = f"""
|
|
from(bucket: "{self._bucket}")
|
|
|> range(start: 0, stop: {time + 1})
|
|
|> filter(fn: (r) => r._measurement == "{measurement}")
|
|
|> filter(fn: (r) => r._field == "value")
|
|
|> last()
|
|
"""
|
|
tables = self._db.query(query)
|
|
for table in tables:
|
|
for record in table.records:
|
|
name = ast.literal_eval(record.get_value())
|
|
if name != None and name != '':
|
|
res.append(ast.literal_eval(record.get_value()))
|
|
return res
|
|
|
|
def _seconds_to_nanoseconds(self, seconds):
|
|
return seconds * 1000000000 |