Fixed two points with same relative value bug (export)
This commit is contained in:
32
influxdb.py
32
influxdb.py
@ -166,19 +166,43 @@ class InfluxDataGetter:
|
||||
variable_name_for_query = var_param[0]
|
||||
parameter = "value" if len(var_param) == 1 else var_param[1]
|
||||
|
||||
# we need to rename the "_time" column to simply "time" in case we want binning because of the comparison done later in the "binned points with same timestamp" process.
|
||||
# chr(34) is the double quote char, because we cannot escape them in a f string
|
||||
query = f"""
|
||||
from(bucket: "{self._bucket}")
|
||||
|> range(start: {times[0]}, stop: {times[1] + 1})
|
||||
|> filter(fn : (r) => r._measurement == "{variable_name_for_query}")
|
||||
|> filter(fn : (r) => r._field == "{parameter+"_float"}")
|
||||
{"|> aggregateWindow(every: duration(v: "+ str(self._seconds_to_nanoseconds(interval))+"), fn: last, createEmpty:false)" if interval != "None" else ""}
|
||||
|> map(fn: (r) => ({{r with relative: ( float(v: uint(v: r._time) - uint(v:{self._seconds_to_nanoseconds(times[0])})) / 1000000000.0 )}}))
|
||||
|> map(fn: (r) => ({{r with timestamp: float(v: uint(v: r._time)) / 1000000000.0}}))
|
||||
{"|> aggregateWindow(every: duration(v: "+ str(self._seconds_to_nanoseconds(interval))+"), fn: last, createEmpty:false, timeDst:"+chr(34)+"binning_time"+chr(34)+")" if interval != "None" else ""}
|
||||
|> map(fn: (r) => ({{r with relative: ( float(v: uint(v: {"r.binning_time" if interval != "None" else "r._time"}) - uint(v:{self._seconds_to_nanoseconds(times[0])})) / 1000000000.0 )}}))
|
||||
|> map(fn: (r) => ({{r with timestamp: float(v: uint(v: {"r.binning_time" if interval != "None" else "r._time"})) / 1000000000.0}}))
|
||||
{"|> rename(columns: {_time:"+chr(34)+"time"+chr(34)+"})" if interval != "None" else ""}
|
||||
|> drop(columns:["_start", "_stop", "_field"])
|
||||
|> pivot(rowKey:["relative", "timestamp", "expired"], columnKey: ["_measurement"], valueColumn: "_value")
|
||||
|> pivot(rowKey:["relative", "timestamp", "expired"{", "+chr(34)+"time"+chr(34) if interval != "None" else ""}], columnKey: ["_measurement"], valueColumn: "_value")
|
||||
"""
|
||||
data_frame = self._db.query_data_frame(query)
|
||||
|
||||
# If there is a binning asked, there can be two points with the same timestamp/relative value, because points with expired=True and expired=False can be found in the same interval, leading to two rows, one with last True, and one with last False.
|
||||
# For each identified couple, we look at the real timestamp of the points used to feed this interval. We then keep the value which is the more recent in this interval.
|
||||
|
||||
if interval != "None" and not data_frame.empty:
|
||||
# we first identify the expired points
|
||||
expired_rows = data_frame.loc[data_frame["expired"] == "True"]
|
||||
|
||||
# we use itertuples to preserve the pandas dtypes, so comparisons can be done. "tuple" is a Python named tuple
|
||||
for expired_point_tuple in expired_rows.itertuples():
|
||||
# Then, we identify if there is a valid point with the same relative time as the current expired point
|
||||
corresponding_valid_point = data_frame.loc[(data_frame["expired"] == "False") & (data_frame["relative"] == expired_point_tuple.relative)]
|
||||
# we make sure that there is only one corresponding valid point, even if in theory, there will never be more than one corresponding valid point
|
||||
if not corresponding_valid_point.empty and len(corresponding_valid_point.index) == 1:
|
||||
# if we did not rename "_time" to "time" sooner in the query, "_time" would have been renamed to a positionnal name (because it starts with a _, see itertuples() doc), making confusion while reading the code
|
||||
if corresponding_valid_point.iloc[0]["time"] > expired_point_tuple.time:
|
||||
data_frame.drop(expired_point_tuple.Index, inplace=True)
|
||||
else:
|
||||
data_frame.drop(corresponding_valid_point.index, inplace=True)
|
||||
# we do not need the "time" column anymore
|
||||
data_frame.drop(["time"], axis=1, inplace=True)
|
||||
|
||||
# Needed for last known value
|
||||
query_last_known = f"""
|
||||
from(bucket: "{self._bucket}")
|
||||
|
Reference in New Issue
Block a user