Added influxDataGetter method to get curves as dataframe

This commit is contained in:
l_samenv
2024-08-26 11:45:00 +02:00
parent 07946fc851
commit c706b78d07

View File

@@ -3,6 +3,8 @@ from configparser import ConfigParser
from chart_config import ChartConfig from chart_config import ChartConfig
import ast import ast
from datetime import datetime from datetime import datetime
from pandas import DataFrame as df, merge_ordered
from numpy import NaN
class InfluxDB: class InfluxDB:
""" """
@@ -31,7 +33,10 @@ class InfluxDB:
TableList : an InfluxDB list of the tables returned by the query TableList : an InfluxDB list of the tables returned by the query
""" """
return self._client.query_api().query(query_str) return self._client.query_api().query(query_str)
def query_data_frame(self, query_str):
return self._client.query_api().query_data_frame(query_str)
class PrettyFloat(float): class PrettyFloat(float):
"""saves bandwidth when converting to JSON """saves bandwidth when converting to JSON
@@ -137,6 +142,113 @@ class InfluxDataGetter:
components = self._get_device_name_components(time) components = self._get_device_name_components(time)
return "/".join(components) return "/".join(components)
def get_curves_data_frame(self, variables, times, interval):
"""
Gets the curves for the given variables within a timerange times, as a pandas dataframe.
All curves are on a single common time axis.
The first column is called "relative", and consists of floating seconds, relative to the beginning of the query.
The "timestamp" column (absolute floating UNIX timestamps in seconds, precise to the nanosecond) is the last one.
If a curve does not have a point at a given point, the last known value for this curve is used.
If a curve gets expired, it is filled with NaN value for the corresponding expired time windows.
The first value (for each variable) is the last known value before the time interval.
Parameters :
variables ([(str)]) : an array of variable names (Influx) to get the curves for.
times ([int]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded.
interval (int) : the interval (resolution) of the values to get (in nanoseconds). Allows data binning.
Returns :
pandas.DataFrame : the curves in a single pandas DataFrame
"""
variables_info = {}
for variable in variables:
var_param = variable.split(".")
variable_name_for_query = var_param[0]
parameter = "value" if len(var_param) == 1 else var_param[1]
variables_info[variable] = {}
variables_info[variable]["expired_ranges"] = []
query = f"""
from(bucket: "{self._bucket}")
|> range(start: {times[0]}, stop: {times[1] + 1})
|> filter(fn : (r) => r._measurement == "{variable_name_for_query}")
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
{"|> aggregateWindow(every: duration(v: "+ str(self._seconds_to_nanoseconds(interval))+"), fn: last, createEmpty:false)" if interval != "None" else ""}
|> map(fn: (r) => ({{r with relative: ( float(v: uint(v: r._time) - uint(v:{self._seconds_to_nanoseconds(times[0])})) / 1000000000.0 )}}))
|> map(fn: (r) => ({{r with timestamp: float(v: uint(v: r._time)) / 1000000000.0}}))
|> drop(columns:["_start", "_stop", "_field"])
|> pivot(rowKey:["relative", "timestamp", "expired"], columnKey: ["_measurement"], valueColumn: "_value")
"""
data_frame = self._db.query_data_frame(query)
# Needed for last known value
query_last_known = f"""
from(bucket: "{self._bucket}")
|> range(start: 0, stop: {times[0] + 1})
|> filter(fn : (r) => r._measurement == "{variable_name_for_query}")
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
|> last()
|> map(fn: (r) => ({{r with relative: 0.0}}))
|> map(fn: (r) => ({{r with timestamp: float(v: uint(v: r._time)) / 1000000000.0}}))
|> drop(columns:["_start", "_stop", "_field"])
|> pivot(rowKey:["relative", "timestamp", "expired"], columnKey: ["_measurement"], valueColumn: "_value")
"""
data_frame_last_known = self._db.query_data_frame(query_last_known)
row_to_insert = None
for index, row in data_frame_last_known.iterrows():
if row_to_insert == None or row["timestamp"] > row_to_insert["timestamp"]:
row_to_insert = row
try:
if not row_to_insert.empty :
row_to_insert["timestamp"] = float(times[0])
data_frame.loc[-1] = row_to_insert
except:
pass
data_frame.drop(["result", "table"], axis=1, inplace=True)
data_frame.sort_values(by=["timestamp"], inplace=True)
data_frame.reset_index()
# Identify time windows for which the curve is expired
for index, row in data_frame.iterrows():
if row["expired"] == "True":
data_frame.loc[index, variable] = NaN
variables_info[variable]["expired_ranges"].append([row["timestamp"]])
elif row["expired"] == "False":
if len(variables_info[variable]["expired_ranges"]) > 0:
variables_info[variable]["expired_ranges"][-1].append(row["timestamp"])
data_frame.reset_index()
data_frame.drop(["expired"], axis=1, inplace=True)
variables_info[variable]["df"] = data_frame
res = None
# Merge single curve dataframes to a global one
if len(variables) == 1:
res = variables_info[variables[0]]["df"]
elif len(variables) == 2:
res = merge_ordered(variables_info[variables[0]]["df"], variables_info[variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
else :
for i,variable in enumerate(variables):
if i == 1:
res = merge_ordered(variables_info[variables[0]]["df"], variables_info[variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
elif i > 1:
res = merge_ordered(res, variables_info[variables[i]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
# Forward fill missing values, then set data points to NaN for those which are expired
if len(variables) > 1:
res.ffill(inplace=True)
for variable, info in variables_info.items():
for range in info["expired_ranges"]:
res.loc[(res["timestamp"] >= range[0]) & ((res["timestamp"] < range[1]) if len(range) == 2 else True), variable] = NaN
# Change order of columns
cols = res.columns.tolist()
cols = [cols[0]] + cols[2:] + [cols[1]]
res = res[cols]
return res
# ----- PRIVATE METHODS # ----- PRIVATE METHODS
def _get_all_setup_info_as_dict(self, times): def _get_all_setup_info_as_dict(self, times):
@@ -476,4 +588,7 @@ class InfluxDataGetter:
name = ast.literal_eval(record.get_value()) name = ast.literal_eval(record.get_value())
if name != None and name != '': if name != None and name != '':
res.append(ast.literal_eval(record.get_value())) res.append(ast.literal_eval(record.get_value()))
return res return res
def _seconds_to_nanoseconds(self, seconds):
return seconds * 1000000000