diff --git a/influxdb.py b/influxdb.py index 4fb047a..8ae5d51 100644 --- a/influxdb.py +++ b/influxdb.py @@ -166,8 +166,6 @@ class InfluxDataGetter: variable_name_for_query = var_param[0] parameter = "value" if len(var_param) == 1 else var_param[1] - variables_info[variable] = {} - variables_info[variable]["expired_ranges"] = [] query = f""" from(bucket: "{self._bucket}") |> range(start: {times[0]}, stop: {times[1] + 1}) @@ -202,16 +200,26 @@ class InfluxDataGetter: row_to_insert = row except: row_to_insert = row - try: + try: #row_to_insert might be None if not row_to_insert.empty : row_to_insert["timestamp"] = float(times[0]) - data_frame.loc[-1] = row_to_insert + if data_frame.empty: + data_frame = row_to_insert.to_frame().T + else: + data_frame.loc[-1] = row_to_insert except: pass + + if data_frame.empty: + continue + data_frame.drop(["result", "table"], axis=1, inplace=True) data_frame.sort_values(by=["timestamp"], inplace=True) data_frame.reset_index() + variables_info[variable] = {} + variables_info[variable]["expired_ranges"] = [] + # Identify time windows for which the curve is expired for index, row in data_frame.iterrows(): @@ -225,25 +233,27 @@ class InfluxDataGetter: data_frame.drop(["expired"], axis=1, inplace=True) variables_info[variable]["df"] = data_frame res = None - + non_empty_variables = list(variables_info.keys()) + # Merge single curve dataframes to a global one - if len(variables) == 1: - res = variables_info[variables[0]]["df"] - elif len(variables) == 2: - res = merge_ordered(variables_info[variables[0]]["df"], variables_info[variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None)) + + if len(non_empty_variables) == 0: + return df() + elif len(non_empty_variables) == 1: + res = variables_info[non_empty_variables[0]]["df"] else : - for i,variable in enumerate(variables): + for i in range(0, len(non_empty_variables)): if i == 1: - res = merge_ordered(variables_info[variables[0]]["df"], variables_info[variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None)) + res = merge_ordered(variables_info[non_empty_variables[0]]["df"], variables_info[non_empty_variables[1]]["df"], on=["timestamp", "relative"], suffixes=(None, None)) elif i > 1: - res = merge_ordered(res, variables_info[variables[i]]["df"], on=["timestamp", "relative"], suffixes=(None, None)) + res = merge_ordered(res, variables_info[non_empty_variables[i]]["df"], on=["timestamp", "relative"], suffixes=(None, None)) # Forward fill missing values, then set data points to NaN for those which are expired - if len(variables) > 1: + if len(non_empty_variables) > 1: res.ffill(inplace=True) for variable, info in variables_info.items(): - for range in info["expired_ranges"]: - res.loc[(res["timestamp"] >= range[0]) & ((res["timestamp"] < range[1]) if len(range) == 2 else True), variable] = NaN + for expired_range in info["expired_ranges"]: + res.loc[(res["timestamp"] >= expired_range[0]) & ((res["timestamp"] < expired_range[1]) if len(expired_range) == 2 else True), variable] = NaN # Change order of columns cols = res.columns.tolist()