From b9507908a067122d5e58840b0c54aa2b70d98598 Mon Sep 17 00:00:00 2001 From: l_samenv Date: Thu, 29 Aug 2024 13:14:58 +0200 Subject: [PATCH] Fixed two points with same relative value bug (export) --- influxdb.py | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/influxdb.py b/influxdb.py index 29e48d6..6a22f8e 100644 --- a/influxdb.py +++ b/influxdb.py @@ -166,19 +166,43 @@ class InfluxDataGetter: variable_name_for_query = var_param[0] parameter = "value" if len(var_param) == 1 else var_param[1] + # we need to rename the "_time" column to simply "time" in case we want binning because of the comparison done later in the "binned points with same timestamp" process. + # chr(34) is the double quote char, because we cannot escape them in a f string query = f""" from(bucket: "{self._bucket}") |> range(start: {times[0]}, stop: {times[1] + 1}) |> filter(fn : (r) => r._measurement == "{variable_name_for_query}") |> filter(fn : (r) => r._field == "{parameter+"_float"}") - {"|> aggregateWindow(every: duration(v: "+ str(self._seconds_to_nanoseconds(interval))+"), fn: last, createEmpty:false)" if interval != "None" else ""} - |> map(fn: (r) => ({{r with relative: ( float(v: uint(v: r._time) - uint(v:{self._seconds_to_nanoseconds(times[0])})) / 1000000000.0 )}})) - |> map(fn: (r) => ({{r with timestamp: float(v: uint(v: r._time)) / 1000000000.0}})) + {"|> aggregateWindow(every: duration(v: "+ str(self._seconds_to_nanoseconds(interval))+"), fn: last, createEmpty:false, timeDst:"+chr(34)+"binning_time"+chr(34)+")" if interval != "None" else ""} + |> map(fn: (r) => ({{r with relative: ( float(v: uint(v: {"r.binning_time" if interval != "None" else "r._time"}) - uint(v:{self._seconds_to_nanoseconds(times[0])})) / 1000000000.0 )}})) + |> map(fn: (r) => ({{r with timestamp: float(v: uint(v: {"r.binning_time" if interval != "None" else "r._time"})) / 1000000000.0}})) + {"|> rename(columns: {_time:"+chr(34)+"time"+chr(34)+"})" if interval != "None" else ""} |> drop(columns:["_start", "_stop", "_field"]) - |> pivot(rowKey:["relative", "timestamp", "expired"], columnKey: ["_measurement"], valueColumn: "_value") + |> pivot(rowKey:["relative", "timestamp", "expired"{", "+chr(34)+"time"+chr(34) if interval != "None" else ""}], columnKey: ["_measurement"], valueColumn: "_value") """ data_frame = self._db.query_data_frame(query) + # If there is a binning asked, there can be two points with the same timestamp/relative value, because points with expired=True and expired=False can be found in the same interval, leading to two rows, one with last True, and one with last False. + # For each identified couple, we look at the real timestamp of the points used to feed this interval. We then keep the value which is the more recent in this interval. + + if interval != "None" and not data_frame.empty: + # we first identify the expired points + expired_rows = data_frame.loc[data_frame["expired"] == "True"] + + # we use itertuples to preserve the pandas dtypes, so comparisons can be done. "tuple" is a Python named tuple + for expired_point_tuple in expired_rows.itertuples(): + # Then, we identify if there is a valid point with the same relative time as the current expired point + corresponding_valid_point = data_frame.loc[(data_frame["expired"] == "False") & (data_frame["relative"] == expired_point_tuple.relative)] + # we make sure that there is only one corresponding valid point, even if in theory, there will never be more than one corresponding valid point + if not corresponding_valid_point.empty and len(corresponding_valid_point.index) == 1: + # if we did not rename "_time" to "time" sooner in the query, "_time" would have been renamed to a positionnal name (because it starts with a _, see itertuples() doc), making confusion while reading the code + if corresponding_valid_point.iloc[0]["time"] > expired_point_tuple.time: + data_frame.drop(expired_point_tuple.Index, inplace=True) + else: + data_frame.drop(corresponding_valid_point.index, inplace=True) + # we do not need the "time" column anymore + data_frame.drop(["time"], axis=1, inplace=True) + # Needed for last known value query_last_known = f""" from(bucket: "{self._bucket}")