Removed old influxdb methods

This commit is contained in:
l_samenv
2024-08-21 11:31:31 +02:00
parent c113e5373a
commit d3017860f8
2 changed files with 59 additions and 378 deletions

View File

@ -52,7 +52,7 @@ class InfluxDataGetter:
self._db = db
# ----- PUBLIC METHODS
def get_available_variables_at_time(self, times):
"""
Gets the available variables (those that we can have a value for since the device has been installed on the instrument) at the given point in time.
@ -60,7 +60,6 @@ class InfluxDataGetter:
Parameters :
times ([int]) : the unix timestamps in seconds of the range. The first value can be unused. The last can represent the point in time.
all (bool) : indicates if we want all the variables for the given times[1] timestamp (all the day)
Returns :
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]}] : a list of dictionnaries, each one representing
@ -68,33 +67,11 @@ class InfluxDataGetter:
"""
all_setup_info = self._get_all_setup_info_as_dict(times)
available_variables = self._extract_variables(all_setup_info)
available_variables = self._remove_variables_without_value_float(available_variables, times)
available_variables = self._set_variables_with_target(available_variables, times)
res = self._group_variables_by_unit(available_variables)
return res
def get_available_variables_at_time2(self, times):
"""
Gets the available variables (those that we can have a value for since the device has been installed on the instrument) at the given point in time.
We can get the last available variables at the given point in time or all the known variables for the day corresponding to the timestamp.
Parameters :
times ([int]) : the unix timestamps in seconds of the range. The first value can be unused. The last can represent the point in time.
all (bool) : indicates if we want all the variables for the given times[1] timestamp (all the day)
Returns :
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]}] : a list of dictionnaries, each one representing
a block of curves with their name, their label and their color to display, grouped by their tag (which can be the unit augmented with an index) and their unit.
"""
all_setup_info = self._get_all_setup_info_as_dict(times)
available_variables = self._extract_variables2(all_setup_info)
available_variables = self._filter_params_with_config(available_variables)
available_variables = self._remove_variables_params_wihout_param_float_and_split(available_variables, times)
res = self._group_variables_by_unit2(available_variables)
res = self._group_variables_by_cat_unit(available_variables)
return res
@ -102,34 +79,6 @@ class InfluxDataGetter:
"""
Gets the curves for the given variables within a timerange.
Parameters :
variables ([(str)]) : an array of variable names (Influx) to get the curves for
time ([int]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
interval (int) : the interval (resolution) of the values to get (in nanoseconds)
Returns :
{(str):[[(int), (float)]]} : a dictionnary of curves. The key is the name of the influx variable, and the value is an array of pairs (also arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y).
"""
res = {}
for variable in variables:
variable_name_for_query = variable
is_target = False
if variable_name_for_query.endswith(".target"):
variable_name_for_query = variable_name_for_query[:-len(".target")]
is_target = True
curve = self._get_curve(variable_name_for_query, is_target, time, interval)
if len(curve) > 0:
res[variable] = curve
return res
def get_curves_in_timerange2(self, variables, time, interval = None):
"""
Gets the curves for the given variables within a timerange.
Parameters :
variables ([(str)]) : an array of variable names (Influx) to get the curves for
time ([int]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
@ -143,7 +92,7 @@ class InfluxDataGetter:
var_param = variable.split(".")
variable_name_for_query = var_param[0]
parameter = "value" if len(var_param) == 1 else var_param[1]
curve = self._get_curve2(variable_name_for_query, parameter, time, interval)
curve = self._get_curve(variable_name_for_query, parameter, time, interval)
if len(curve) > 0:
res[variable] = curve
@ -155,32 +104,7 @@ class InfluxDataGetter:
Parameters :
variables ([(str)]) : an array of variable names (Influx) to get the last known values for
end_time (int) : the Unix timestamp in seconds of the last point in time to include the values in
Returns :
{(str):[[(int), (float)]]} : a dictionnary of points. The key is the name of the influx variable, and the value is an array of pairs (also array), the first value being the Unix timestamp in second (x), the seconds being the value (y).
"""
res = {}
for variable in variables:
variable_name_for_query = variable
is_target = False
if variable_name_for_query.endswith(".target"):
variable_name_for_query = variable_name_for_query[:-len(".target")]
is_target = True
start_time = int(lastvalues[variable_name_for_query][0]) if variable_name_for_query in lastvalues.keys() else None
start_time = start_time if start_time != None and start_time <= end_time else None # to prevent self.lastvalues being changed by w_graph between entrance of graphpoll and poll_last_values
points = self._get_last_values(variable_name_for_query, is_target,start_time, end_time)
if len(points) > 0 :
res[variable] = points
return res
def poll_last_values2(self, variables, lastvalues, end_time):
"""
Polls the lastest values for the given variables since their last known point to end_time.
Parameters :
variables ([(str)]) : an array of variable names (Influx) to get the last known values for
lastvalues ({(str):((float), (float))}) : a dictionnary of tuples, first value being the floating Unix timestamp in seconds (precision = ms) of the last known value for the curve, and the second value being the associated value, indexed by the curve name
end_time (int) : the Unix timestamp in seconds of the last point in time to include the values in
Returns :
@ -194,7 +118,7 @@ class InfluxDataGetter:
parameter = "value" if len(var_param) == 1 else var_param[1]
start_time = int(lastvalues[variable_name_for_query][0]) if variable_name_for_query in lastvalues.keys() else None
start_time = start_time if start_time != None and start_time <= end_time else None # to prevent self.lastvalues being changed by w_graph between entrance of graphpoll and poll_last_values
points = self._get_last_values2(variable_name_for_query,parameter,start_time, end_time)
points = self._get_last_values(variable_name_for_query,parameter,start_time, end_time)
if len(points) > 0 :
res[variable] = points
return res
@ -246,7 +170,7 @@ class InfluxDataGetter:
res.extend(to_add)
return res
def _extract_variables(self, all_setup_info_dict):
"""
Extracts relevant information out of the setup_info dict for each available variable in measurements nicos/se_main, nicos/se_stick, nicos/se_addons
@ -256,39 +180,8 @@ class InfluxDataGetter:
being the type of Secop device for this module, and the value is too big to give its signature. Some tuple examples can be found under graphs/setup_info_examples.
Returns :
[{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}] : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a potential target available.
"""
available_varirables = []
added_names = []
for setup_info_dict in all_setup_info_dict:
for (_, content) in setup_info_dict.items():
if content[0] != "nicos.devices.secop.devices.SecopDevice":
name = self._transform_secop_module_name_to_influx(content[1]["secop_module"])
if name not in added_names:
available_varirables.append(
{
"name":name,
"label":content[1]["secop_module"],
"unit":content[1]["unit"],
"has_potential_target": "target_datainfo" in content[1].keys()
}
)
added_names.append(name)
return available_varirables
def _extract_variables2(self, all_setup_info_dict):
"""
Extracts relevant information out of the setup_info dict for each available variable in measurements nicos/se_main, nicos/se_stick, nicos/se_addons
Parameters :
all_setup_info_dict ([{(str):((str), {...})}]) : an array of the parsed "setup_info dict" of each measurements. The key is the secop_module prefixed with "se_", and the value is a tuple with its first value
being the type of Secop device for this module, and the value is too big to give its signature. Some tuple examples can be found under graphs/setup_info_examples.
Returns :
[{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}] : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a potential target available.
[{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}] : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, and a dictionnary of parameters (including value), which consist of dictionnares with the category (empty for the moment), the color (same) and the unit (if available), indexed by the name of the parameter.
"""
available_varirables = []
@ -329,6 +222,16 @@ class InfluxDataGetter:
return self._influx_instrument_config["measurement_prefix"] + secop_module_name.lower()
def _filter_params_with_config(self, available_variables):
"""
Removes and updates (cat, color, unit) the parameters according to the content of variables_config.ini file.
Parameters:
available_variables ([{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, and a dictionnary of parameters (including value), which consist of dictionnares with the category (empty for the moment), the color (same) and the unit (if available), indexed by the name of the parameter.
Returns :
[{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}] : the available_variables parameter, updated
"""
chart_config = ChartConfig()
@ -349,6 +252,18 @@ class InfluxDataGetter:
return available_variables
def _remove_variables_params_wihout_param_float_and_split(self, available_variables, times):
"""
For each variable, removes the parameters if the Influx database does not contain <param>.float field, and split the parameters to the corresponding output format.
Parameters:
available_variables ([{"name":(str), "label":(str), "params":{(str):{"cat":(str), "color":(str), "unit":(str)}}}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, and a dictionnary of parameters (including value), which consist of dictionnares with the category, the color and the unit, indexed by the name of the parameter.
times ([int]): (only second value used) the unix timestamps in seconds of the range at which we want to get the available variables.
Returns :
[{"name":(str), "label":(str), "cat":(str), "color":(str), "unit":(str)}] : an array of dictionnaries, each containing the name of the variable[.<param>],
the label to display in the Web GUI, its category, its color and its unit.
"""
res = []
for variable in available_variables:
query = f"""
@ -369,196 +284,37 @@ class InfluxDataGetter:
})
return res
def _remove_variables_without_value_float(self, available_variables, times):
"""
Removes some of the previously identified available_variables if they effectively do not have a value_float field in InfluxDB.
Parameters :
available_variables ([{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a potential target available.
times ([int]): (only second value used) the unix timestamps in seconds of the range at which we want to get the available variables (for the value).
Returns :
[{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}] : an array of dictionnaries (updated), each containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a potential target available.
"""
res = []
for variable in available_variables:
query = f"""
import "influxdata/influxdb/schema"
schema.measurementFieldKeys(bucket: "{self._bucket}", measurement: "{variable["name"]}", start:0, stop: {times[1] + 1})
|> yield(name: "res")
"""
records = self._db.query(query)[0].records
if "value_float" in [record.get_value() for record in records]:
res.append(variable)
return res
def _set_variables_with_target(self, available_variables, times):
"""
Determines if the previously identified available_variables have effectively a target or not (meaning it has a target_float field in Influx).
Parameters :
available_variables ([{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a potential target available.
times ([int]): (only second value used) the unix timestamps in seconds of the range at which we want to get the available variables (for the target).
Returns :
[{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}] : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a potential target available (updated).
"""
for variable in available_variables:
if variable["has_potential_target"]:
query = f"""
import "influxdata/influxdb/schema"
schema.measurementFieldKeys(bucket: "{self._bucket}", measurement: "{variable["name"]}", start:0, stop: {times[1] + 1})
|> yield(name: "res")
"""
records = self._db.query(query)[0].records
if not "target_float" in [record.get_value() for record in records]:
variable["has_potential_target"] = False
return available_variables
def _group_variables_by_unit2(self, available_variables):
"""Performs a group by unit, while removing useless information and adding target curves.
def _group_variables_by_cat_unit(self, available_variables):
"""
Performs a group by cat and unit for the available variables
Parameters :
available_variables ([{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a target available.
available_variables ([{"name":(str), "label":(str), "cat":(str), "color":(str), "unit":(str)}]) : an array of dictionnaries, each containing the name of the variable[.<param>],
the label to display in the Web GUI, its category, its color and its unit.
Returns :
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]] : a list of dictionnaries, each one representing
a block of curves with their name, their label and their color to display, grouped by their tag (which can be the unit augmented with an index) and their unit.
a block of curves with their name, their label and their color to display, grouped by their category and unit (in tag)
"""
groups = {}
for available_variable in available_variables:
self._append_variable2(groups, available_variable)
return list(groups.values())
def _append_variable2(self, groups, variable):
key = available_variable["cat"]+"_"+available_variable["unit"] if available_variable["cat"] != "" else available_variable["unit"]
if key not in groups.keys():
groups[key] = {"tag":key, "unit":available_variable["unit"], "curves":[]}
groups[key]["curves"].append({
"name":available_variable["name"],
"label":available_variable["label"],
"color":available_variable["color"],
})
return list(groups.values())
def _get_curve(self, variable, parameter, time, interval=None):
"""
Appends the variable in the unit group with a tag and a color, and creates the unit key if not available.
Parameters :
groups ({}) : a dictionnary that contains the curves grouped by unit, which will be updated
variable ({"name":(str), "label":(str), "unit":(str)[,"has_potential_target":(bool)]}) : a dictionnary containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, its unit and possibly a boolean value indicating if the variable has a target available.
"""
key = variable["cat"]+"_"+variable["unit"] if variable["cat"] != "" else variable["unit"]
if key not in groups.keys():
groups[key] = {"tag":key, "unit":variable["unit"], "curves":[]} #tag is set to cat+unit while client is not modified
groups[key]["curves"].append({
"name":variable["name"],
"label":variable["label"],
"color":variable["color"],
})
def _group_variables_by_unit(self, available_variables):
"""Performs a group by unit, while removing useless information and adding target curves.
Parameters :
available_variables ([{"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}]) : an array of dictionnaries, each containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a target available.
Returns :
[{"tag":(str), "unit":(str), "curves":[{"name":(str), "label":(str), "color":(str)}]] : a list of dictionnaries, each one representing
a block of curves with their name, their label and their color to display, grouped by their tag (which can be the unit augmented with an index) and their unit.
"""
groups = {}
for available_variable in available_variables:
if available_variable["has_potential_target"]:
target_variable = self._get_formatted_target_variable(available_variable)
self._append_variable(groups, target_variable)
self._append_variable(groups, available_variable)
return list(groups.values())
def _get_formatted_target_variable(self, variable):
"""
Formats the variable which has a target to be added to the unit groups, meaning it adds ".target" as the suffix in the Influx name and label, and removes the "has_target" value.
Parameters :
variable ({"name":(str), "label":(str), "unit":(str), "has_potential_target":(bool)}) : a dictionnary containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, its unit and a boolean value indicating if the variable has a target available.
Returns :
{"name":(str), "label":(str), "unit":(str)} : a dictionnary containing the Influx name of the corresponding variable out of the setup_info dict (augmented with ".target" suffix),
the label (augmented with ".target" suffix) to display in the Web GUI.
"""
return {
"name":variable["name"]+".target",
"label":variable["label"]+".target",
"unit":variable["unit"]
}
def _append_variable(self, groups, variable):
"""
Appends the variable in the unit group with a tag and a color, and creates the unit key if not available.
Parameters :
groups ({}) : a dictionnary that contains the curves grouped by unit, which will be updated
variable ({"name":(str), "label":(str), "unit":(str)[,"has_potential_target":(bool)]}) : a dictionnary containing the Influx name of the corresponding variable out of the setup_info dict,
the label to display in the Web GUI, its unit and possibly a boolean value indicating if the variable has a target available.
"""
if variable["unit"] not in groups.keys():
groups[variable["unit"]] = {"tag":variable["unit"], "unit":variable["unit"], "curves":[]}
groups[variable["unit"]]["curves"].append({
"name":variable["name"],
"label":variable["label"],
"color":""
})
def _get_curve(self, variable, is_target, time, interval=None):
"""
Gets the points (curve) within a timerange for the given variable.
Parameters :
variable (str) : the name (Influx) of the variable we want the values of.
is_target (bool) : tells if the given variable is a target, or not (if variable is "nicos/se_t_chip.target", then is_target has to be set to True)
time ([(int)]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
interval (int) : the interval (resolution) of the values to get (in nanoseconds)
Returns :
[[(int), (float)]] : an array of pairs (also arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y)
"""
res = []
query = f"""
from(bucket: "{self._bucket}")
|> range(start: {time[0]}, stop: {time[1] + 1})
|> filter(fn : (r) => r._measurement == "{variable}")
|> filter(fn : (r) => r._field == "{"target_float" if is_target else "value_float"}")
{"|> aggregateWindow(every: duration(v:"+str(interval)+"), fn: last, createEmpty:false)" if interval else ""}
|> keep(columns: ["_time","_value"])
|> yield(name: "res")
"""
tables = self._db.query(query)
for table in tables:
for record in table.records:
t = round(datetime.timestamp(record.get_time()), 3)
value = record.get_value()
try:
value = PrettyFloat(value)
except:
value = None
res.append([t, value])
return self._insert_last_known_value(variable, is_target, res, time)
def _get_curve2(self, variable, parameter, time, interval=None):
"""
Gets the points (curve) within a timerange for the given variable.
Gets the points (curve) within a timerange for the given variable and parameter.
Parameters :
variable (str) : the name (Influx) of the variable we want the values of.
@ -590,47 +346,11 @@ class InfluxDataGetter:
value = None
res.append([t, value])
return self._insert_last_known_value2(variable, parameter, res, time)
return self._insert_last_known_value(variable, parameter, res, time)
def _insert_last_known_value(self, variable, is_target, curve, time):
def _insert_last_known_value(self, variable, parameter, curve, time):
"""
Adds the last known value as the first point in the curve if the last known value is outside the viewing window.
Parameters :
variable (str) : the name (Influx) of the variable we want the values of.
is_target (bool) : tells if the given variable is a target, or not (if variable is "nicos/se_t_chip.target", then is_target has to be set to True)
curve ([[(int), (float)]]) : an array of pairs (arrays), the first value being the Unix timestamp in second (x), the seconds being the value (y)
time ([(int)]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
Returns :
[[(int), (float)]] : the curve of the parameter, updated with a potential new first point
"""
if len(curve) == 0 or curve[0][0] != time[0]:
query = f"""
from(bucket: "{self._bucket}")
|> range(start: 0, stop: {time[0]+1})
|> filter(fn : (r) => r._measurement == "{variable}")
|> filter(fn : (r) => r._field == "{"target_float" if is_target else "value_float"}")
|> last()
|> keep(columns: ["_value"])
|> yield(name: "res")
"""
tables = self._db.query(query)
for table in tables:
for record in table.records:
value = record.get_value()
try:
value = PrettyFloat(value)
except:
value = None
curve.insert(0, [time[0], value])
return curve
def _insert_last_known_value2(self, variable, parameter, curve, time):
"""
Adds the last known value as the first point in the curve if the last known value is outside the viewing window.
Adds the last known value as the first point in the curve if the last known value is outside the viewing window, for the given variable and parameter.
Parameters :
variable (str) : the name (Influx) of the variable we want the values of.
@ -662,50 +382,11 @@ class InfluxDataGetter:
except:
value = None
curve.insert(0, [time[0], value])
return curve
return curve
def _get_last_values(self, variable, is_target, start_time, end_time):
def _get_last_values(self, variable, parameter, start_time, end_time):
"""
Gets the lastest values for the given variable that are in [start_time, end_time].
Parameters :
variable (str) : the name (Influx) of the variable we want the last value of.
is_target (bool) : tells if the given variable is a target, or not (if variable is "nicos/se_t_chip.target", then is_target has to be set to True)
start_time (int|None) : the start of time range (Unix timestamp in seconds) to include the values in
end_time (int) : the end of time range (Unix timestamp in seconds) to include the values in
Returns :
[[(int), (float)]] : an array of points (also arrays). The first value is the Unix timestamp in second (x), the seconds is the value (y)
"""
res = []
query = f"""
from(bucket: "{self._bucket}")
|> range(start: {start_time if start_time != None else 0}, stop: {end_time+1})
|> filter(fn : (r) => r._measurement == "{variable}")
|> filter(fn : (r) => r._field == "{"target_float" if is_target else "value_float"}")
{"|> last()" if start_time == None else ""}
|> keep(columns: ["_time","_value"])
|> yield(name: "res")
"""
# this loop might be simplified, but it has to be kept to catch the case when there is unavailable data
tables = self._db.query(query)
for table in tables:
for record in table.records:
t = round(datetime.timestamp(record.get_time()), 3)
value = record.get_value()
try:
value = PrettyFloat(value)
except:
value = None
res.append([t, value])
return res
def _get_last_values2(self, variable, parameter, start_time, end_time):
"""
Gets the lastest values for the given variable that are in [start_time, end_time].
Gets the lastest values for the given variable and parameter that are in [start_time, end_time].
Parameters :
variable (str) : the name (Influx) of the variable we want the last value of.

View File

@ -98,7 +98,7 @@ class InfluxGraph:
logging.info('LIVE %g %g %d %d', end, now, end >= now, self.livemode)
if interval : interval = self.milliseconds_to_nano(int(interval))
result = self.influx_data_getter.get_curves_in_timerange2(queried_variables, queried_time_range, interval)
result = self.influx_data_getter.get_curves_in_timerange(queried_variables, queried_time_range, interval)
self.complete_to_end(result, min(end, now))
self.end_query = end
# reduction not yet implemented
@ -137,7 +137,7 @@ class InfluxGraph:
start_time = int(self.get_abs_time(time)[0])
end_time = int(self.get_abs_time(time)[-1])
blocks = self.influx_data_getter.get_available_variables_at_time2([start_time, end_time])
blocks = self.influx_data_getter.get_available_variables_at_time([start_time, end_time])
device_name = self.influx_data_getter.get_device_name(end_time)
# updates the self.variables attribute to keep track of the available variables
self.variables = [variable["name"] for block in blocks for variable in block["curves"]]
@ -177,7 +177,7 @@ class InfluxGraph:
return None
now, = self.get_abs_time([0])
result = self.influx_data_getter.poll_last_values2(self.variables, self.lastvalues, now)
result = self.influx_data_getter.poll_last_values(self.variables, self.lastvalues, now)
for variable in self.lastvalues.keys():
if variable in result.keys():