diff --git a/influxdb.py b/influxdb.py index 6a22f8e..2a16e52 100644 --- a/influxdb.py +++ b/influxdb.py @@ -483,20 +483,38 @@ class InfluxDataGetter: |> range(start: {time[0]}, stop: {time[1] + 1}) |> filter(fn : (r) => r._measurement == "{variable}") |> filter(fn : (r) => r._field == "{parameter+"_float"}") - {"|> aggregateWindow(every: duration(v:"+str(interval)+"), fn: last, createEmpty:false)" if interval else ""} - |> keep(columns: ["_time","_value","expired"]) + {"|> aggregateWindow(every: duration(v:"+str(interval)+"), fn: last, createEmpty:false, timeDst:"+chr(34)+"binning_time"+chr(34)+")" if interval else ""} + |> keep(columns: ["_time","_value","expired"{", "+chr(34)+"binning_time"+chr(34) if interval else ""}]) |> yield(name: "res") """ tables = self._db.query(query) for table in tables: for record in table.records: - t = round(datetime.timestamp(record.get_time()), 3) + t = round(datetime.timestamp(record["binning_time"] if interval else record.get_time()), 3) # t is the real timestamp if no interval is given, or the binned timestamp if interval value = record.get_value() try: value = PrettyFloat(value) except: value = None - raw.append([t, value, record["expired"]]) + point = [t, value, record["expired"]] + if interval: # t is the binning time, we need to add the real time of the point that was used in this interval + point.append(record.get_time()) + raw.append(point) + + if interval: + indexes_to_delete = [] + expired_points = {i:point for i,point in enumerate(raw) if point[2] == "True"} #we need to keep track of the indexes of the expired point in the list + for expired_point_index, expired_point in expired_points.items(): + for i, point in enumerate(raw): + if point[2] == "False" and expired_point[0] == point[0]: # if the current point is expired and has the same binning time as the current expired point + if point[3] > expired_point[3]: # comparison on the real timestamp used. + indexes_to_delete.insert(0, expired_point_index) + else: + indexes_to_delete.insert(0,i) + sorted(indexes_to_delete, reverse=True) #we have to make sure that the list is sorted in reverse to then delete at the given indexes + for index in indexes_to_delete: + del raw[index] + sorted_raw = sorted(raw, key=lambda pair: pair[0]) #expired=True and expired=False are in two Influx tables, so they need to be synchronized res = [] for pair in sorted_raw: