Fixed insert last known value when df is empty bug
This commit is contained in:
40
influxdb.py
40
influxdb.py
@ -166,8 +166,6 @@ class InfluxDataGetter:
|
|||||||
variable_name_for_query = var_param[0]
|
variable_name_for_query = var_param[0]
|
||||||
parameter = "value" if len(var_param) == 1 else var_param[1]
|
parameter = "value" if len(var_param) == 1 else var_param[1]
|
||||||
|
|
||||||
variables_info[variable] = {}
|
|
||||||
variables_info[variable]["expired_ranges"] = []
|
|
||||||
query = f"""
|
query = f"""
|
||||||
from(bucket: "{self._bucket}")
|
from(bucket: "{self._bucket}")
|
||||||
|> range(start: {times[0]}, stop: {times[1] + 1})
|
|> range(start: {times[0]}, stop: {times[1] + 1})
|
||||||
@ -202,16 +200,26 @@ class InfluxDataGetter:
|
|||||||
row_to_insert = row
|
row_to_insert = row
|
||||||
except:
|
except:
|
||||||
row_to_insert = row
|
row_to_insert = row
|
||||||
try:
|
try: #row_to_insert might be None
|
||||||
if not row_to_insert.empty :
|
if not row_to_insert.empty :
|
||||||
row_to_insert["timestamp"] = float(times[0])
|
row_to_insert["timestamp"] = float(times[0])
|
||||||
data_frame.loc[-1] = row_to_insert
|
if data_frame.empty:
|
||||||
|
data_frame = row_to_insert.to_frame().T
|
||||||
|
else:
|
||||||
|
data_frame.loc[-1] = row_to_insert
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
if data_frame.empty:
|
||||||
|
continue
|
||||||
|
|
||||||
data_frame.drop(["result", "table"], axis=1, inplace=True)
|
data_frame.drop(["result", "table"], axis=1, inplace=True)
|
||||||
data_frame.sort_values(by=["timestamp"], inplace=True)
|
data_frame.sort_values(by=["timestamp"], inplace=True)
|
||||||
data_frame.reset_index()
|
data_frame.reset_index()
|
||||||
|
|
||||||
|
variables_info[variable] = {}
|
||||||
|
variables_info[variable]["expired_ranges"] = []
|
||||||
|
|
||||||
# Identify time windows for which the curve is expired
|
# Identify time windows for which the curve is expired
|
||||||
|
|
||||||
for index, row in data_frame.iterrows():
|
for index, row in data_frame.iterrows():
|
||||||
@ -225,25 +233,27 @@ class InfluxDataGetter:
|
|||||||
data_frame.drop(["expired"], axis=1, inplace=True)
|
data_frame.drop(["expired"], axis=1, inplace=True)
|
||||||
variables_info[variable]["df"] = data_frame
|
variables_info[variable]["df"] = data_frame
|
||||||
res = None
|
res = None
|
||||||
|
non_empty_variables = list(variables_info.keys())
|
||||||
|
|
||||||
# Merge single curve dataframes to a global one
|
# Merge single curve dataframes to a global one
|
||||||
if len(variables) == 1:
|
|
||||||
res = variables_info[variables[0]]["df"]
|
if len(non_empty_variables) == 0:
|
||||||
elif len(variables) == 2:
|
return df()
|
||||||
res = merge_ordered(variables_info[variables[0]]["df"], variables_info[variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
|
elif len(non_empty_variables) == 1:
|
||||||
|
res = variables_info[non_empty_variables[0]]["df"]
|
||||||
else :
|
else :
|
||||||
for i,variable in enumerate(variables):
|
for i in range(0, len(non_empty_variables)):
|
||||||
if i == 1:
|
if i == 1:
|
||||||
res = merge_ordered(variables_info[variables[0]]["df"], variables_info[variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
|
res = merge_ordered(variables_info[non_empty_variables[0]]["df"], variables_info[non_empty_variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
|
||||||
elif i > 1:
|
elif i > 1:
|
||||||
res = merge_ordered(res, variables_info[variables[i]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
|
res = merge_ordered(res, variables_info[non_empty_variables[i]]["df"], on=["timestamp", "relative"], suffixes=(None, None))
|
||||||
|
|
||||||
# Forward fill missing values, then set data points to NaN for those which are expired
|
# Forward fill missing values, then set data points to NaN for those which are expired
|
||||||
if len(variables) > 1:
|
if len(non_empty_variables) > 1:
|
||||||
res.ffill(inplace=True)
|
res.ffill(inplace=True)
|
||||||
for variable, info in variables_info.items():
|
for variable, info in variables_info.items():
|
||||||
for range in info["expired_ranges"]:
|
for expired_range in info["expired_ranges"]:
|
||||||
res.loc[(res["timestamp"] >= range[0]) & ((res["timestamp"] < range[1]) if len(range) == 2 else True), variable] = NaN
|
res.loc[(res["timestamp"] >= expired_range[0]) & ((res["timestamp"] < expired_range[1]) if len(expired_range) == 2 else True), variable] = NaN
|
||||||
|
|
||||||
# Change order of columns
|
# Change order of columns
|
||||||
cols = res.columns.tolist()
|
cols = res.columns.tolist()
|
||||||
|
Reference in New Issue
Block a user