From c706b78d07d8b2a846c4c9df5a9cc6defa5d89fc Mon Sep 17 00:00:00 2001 From: l_samenv Date: Mon, 26 Aug 2024 11:45:00 +0200 Subject: [PATCH] Added influxDataGetter method to get curves as dataframe --- influxdb.py | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 117 insertions(+), 2 deletions(-) diff --git a/influxdb.py b/influxdb.py index d4f0e6f..a9116ef 100644 --- a/influxdb.py +++ b/influxdb.py @@ -3,6 +3,8 @@ from configparser import ConfigParser from chart_config import ChartConfig import ast from datetime import datetime +from pandas import DataFrame as df, merge_ordered +from numpy import NaN class InfluxDB: """ @@ -31,7 +33,10 @@ class InfluxDB: TableList : an InfluxDB list of the tables returned by the query """ return self._client.query_api().query(query_str) - + + def query_data_frame(self, query_str): + return self._client.query_api().query_data_frame(query_str) + class PrettyFloat(float): """saves bandwidth when converting to JSON @@ -137,6 +142,113 @@ class InfluxDataGetter: components = self._get_device_name_components(time) return "/".join(components) + def get_curves_data_frame(self, variables, times, interval): + """ + Gets the curves for the given variables within a timerange times, as a pandas dataframe. + All curves are on a single common time axis. + The first column is called "relative", and consists of floating seconds, relative to the beginning of the query. + The "timestamp" column (absolute floating UNIX timestamps in seconds, precise to the nanosecond) is the last one. + If a curve does not have a point at a given point, the last known value for this curve is used. + If a curve gets expired, it is filled with NaN value for the corresponding expired time windows. + The first value (for each variable) is the last known value before the time interval. + + Parameters : + variables ([(str)]) : an array of variable names (Influx) to get the curves for. + times ([int]) : the timerange we want the values in. It consists of two values which are Unix timestamps in seconds, first included, second excluded. + interval (int) : the interval (resolution) of the values to get (in nanoseconds). Allows data binning. + + Returns : + pandas.DataFrame : the curves in a single pandas DataFrame + """ + variables_info = {} + for variable in variables: + var_param = variable.split(".") + variable_name_for_query = var_param[0] + parameter = "value" if len(var_param) == 1 else var_param[1] + + variables_info[variable] = {} + variables_info[variable]["expired_ranges"] = [] + query = f""" + from(bucket: "{self._bucket}") + |> range(start: {times[0]}, stop: {times[1] + 1}) + |> filter(fn : (r) => r._measurement == "{variable_name_for_query}") + |> filter(fn : (r) => r._field == "{parameter+"_float"}") + {"|> aggregateWindow(every: duration(v: "+ str(self._seconds_to_nanoseconds(interval))+"), fn: last, createEmpty:false)" if interval != "None" else ""} + |> map(fn: (r) => ({{r with relative: ( float(v: uint(v: r._time) - uint(v:{self._seconds_to_nanoseconds(times[0])})) / 1000000000.0 )}})) + |> map(fn: (r) => ({{r with timestamp: float(v: uint(v: r._time)) / 1000000000.0}})) + |> drop(columns:["_start", "_stop", "_field"]) + |> pivot(rowKey:["relative", "timestamp", "expired"], columnKey: ["_measurement"], valueColumn: "_value") + """ + data_frame = self._db.query_data_frame(query) + + # Needed for last known value + query_last_known = f""" + from(bucket: "{self._bucket}") + |> range(start: 0, stop: {times[0] + 1}) + |> filter(fn : (r) => r._measurement == "{variable_name_for_query}") + |> filter(fn : (r) => r._field == "{parameter+"_float"}") + |> last() + |> map(fn: (r) => ({{r with relative: 0.0}})) + |> map(fn: (r) => ({{r with timestamp: float(v: uint(v: r._time)) / 1000000000.0}})) + |> drop(columns:["_start", "_stop", "_field"]) + |> pivot(rowKey:["relative", "timestamp", "expired"], columnKey: ["_measurement"], valueColumn: "_value") + """ + + data_frame_last_known = self._db.query_data_frame(query_last_known) + row_to_insert = None + for index, row in data_frame_last_known.iterrows(): + if row_to_insert == None or row["timestamp"] > row_to_insert["timestamp"]: + row_to_insert = row + try: + if not row_to_insert.empty : + row_to_insert["timestamp"] = float(times[0]) + data_frame.loc[-1] = row_to_insert + except: + pass + data_frame.drop(["result", "table"], axis=1, inplace=True) + data_frame.sort_values(by=["timestamp"], inplace=True) + data_frame.reset_index() + + # Identify time windows for which the curve is expired + + for index, row in data_frame.iterrows(): + if row["expired"] == "True": + data_frame.loc[index, variable] = NaN + variables_info[variable]["expired_ranges"].append([row["timestamp"]]) + elif row["expired"] == "False": + if len(variables_info[variable]["expired_ranges"]) > 0: + variables_info[variable]["expired_ranges"][-1].append(row["timestamp"]) + data_frame.reset_index() + data_frame.drop(["expired"], axis=1, inplace=True) + variables_info[variable]["df"] = data_frame + res = None + + # Merge single curve dataframes to a global one + if len(variables) == 1: + res = variables_info[variables[0]]["df"] + elif len(variables) == 2: + res = merge_ordered(variables_info[variables[0]]["df"], variables_info[variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None)) + else : + for i,variable in enumerate(variables): + if i == 1: + res = merge_ordered(variables_info[variables[0]]["df"], variables_info[variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None)) + elif i > 1: + res = merge_ordered(res, variables_info[variables[i]]["df"], on=["timestamp", "relative"], suffixes=(None, None)) + + # Forward fill missing values, then set data points to NaN for those which are expired + if len(variables) > 1: + res.ffill(inplace=True) + for variable, info in variables_info.items(): + for range in info["expired_ranges"]: + res.loc[(res["timestamp"] >= range[0]) & ((res["timestamp"] < range[1]) if len(range) == 2 else True), variable] = NaN + + # Change order of columns + cols = res.columns.tolist() + cols = [cols[0]] + cols[2:] + [cols[1]] + res = res[cols] + + return res + # ----- PRIVATE METHODS def _get_all_setup_info_as_dict(self, times): @@ -476,4 +588,7 @@ class InfluxDataGetter: name = ast.literal_eval(record.get_value()) if name != None and name != '': res.append(ast.literal_eval(record.get_value())) - return res \ No newline at end of file + return res + + def _seconds_to_nanoseconds(self, seconds): + return seconds * 1000000000 \ No newline at end of file